This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 2abf5b1bbb Single node META FATE data (#5127)
2abf5b1bbb is described below
commit 2abf5b1bbbbf72609dcd8142e49868c79b4ef79d
Author: Kevin Rathbun <[email protected]>
AuthorDate: Mon Dec 16 11:06:11 2024 -0500
Single node META FATE data (#5127)
- Moved all the fate data for a single `META` transaction into a single ZK
node
- Pushed all the data into `NodeValue` (renamed to `FateData`)
- Previously, `FateData` stored `TStatus`, `FateReservation`, and
`FateKey`. Now additionally stores the `REPO` stack and `TxInfo`.
- Status enforcement added to `MetaFateStore` (previously only possible for
`UserFateStore`).
- Moved `testFateInitialConfigCorrectness()` from `UserFateStoreIT` to
`UserFateIT`
- Renamed `UserFateStoreIT` to `UserFateStatusEnforcementIT` (now extends a
new class `FateStatusEnforcementIT`)
- Now only tests status enforcement (previously status enforcement
+ `testFateInitialConfigCorrectness()`)
- Created `MetaFateStatusEnforcementIT` (extends `FateStatusEnforcementIT`)
- Tests that the new status enforcement in `MetaFateStore` works
- Created `FateStoreUtil`, moving the `createFateTable()` util here,
created `MetaFateZKSetup` inner class here (the counterpart to
`createFateTable()` for `UserFateStore` but sets up ZooKeeper for use in
`MetaFateStore`)
- Deleted `UserFateStoreIT`s (now `UserFateStatusEnforcementIT`) method
`injectStatus` replacing with the existing `setStatus` which does the same thing
- Changed `StackOverflowException` to instead be a `RuntimeException`
(previously `Exception`)
- Deleted unnecessary preexisting catch and immediate re-throw of a
`StackOverflowException` in `MetaFateStore.FateTxStoreImpl.push(repo)`
- Cleaned up and refactored `MetaFateStore` methods which mutate existing
FateData; now reuse same pattern across these methods: all call new method
`MetaFateStore.mutate()`
---
.../accumulo/core/fate/AbstractFateStore.java | 15 +-
.../accumulo/core/fate/StackOverflowException.java | 2 +-
.../accumulo/core/fate/user/UserFateStore.java | 25 +-
.../core/fate/zookeeper/MetaFateStore.java | 565 ++++++++++++---------
.../test/fate/FateStatusEnforcementIT.java | 98 ++++
.../apache/accumulo/test/fate/FateStoreUtil.java | 111 ++++
.../apache/accumulo/test/fate/meta/MetaFateIT.java | 35 +-
.../fate/meta/MetaFateStatusEnforcementIT.java | 54 ++
.../test/fate/meta/MetaFateStoreFateIT.java | 102 ++--
.../test/fate/meta/MetaMultipleStoresIT.java | 21 +-
.../apache/accumulo/test/fate/user/UserFateIT.java | 56 +-
.../test/fate/user/UserFateInterleavingIT.java | 2 +-
.../fate/user/UserFateStatusEnforcementIT.java | 62 +++
.../test/fate/user/UserFateStoreFateIT.java | 2 +-
.../accumulo/test/fate/user/UserFateStoreIT.java | 232 ---------
.../test/fate/user/UserMultipleStoresIT.java | 2 +-
16 files changed, 811 insertions(+), 573 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 3bc322c3c2..749a326064 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -75,12 +76,22 @@ public abstract class AbstractFateStore<T> implements
FateStore<T> {
return FateId.from(instanceType, UUID.randomUUID());
}
};
+ protected static final int MAX_REPOS = 100;
// The ZooKeeper lock for the process that's running this store instance
protected final ZooUtil.LockID lockID;
protected final Predicate<ZooUtil.LockID> isLockHeld;
protected final Map<FateId,CountDownTimer> deferred;
protected final FateIdGenerator fateIdGenerator;
+ // the statuses required to perform operations
+ public static final Set<TStatus> REQ_PUSH_STATUS =
Set.of(TStatus.IN_PROGRESS, TStatus.NEW);
+ public static final Set<TStatus> REQ_POP_STATUS =
+ Set.of(TStatus.FAILED_IN_PROGRESS, TStatus.SUCCESSFUL);
+ public static final Set<TStatus> REQ_DELETE_STATUS =
+ Set.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL,
TStatus.FAILED);
+ // all but UNKNOWN
+ public static final Set<TStatus> REQ_FORCE_DELETE_STATUS =
Set.of(TStatus.NEW, TStatus.SUBMITTED,
+ TStatus.SUCCESSFUL, TStatus.FAILED, TStatus.FAILED_IN_PROGRESS,
TStatus.IN_PROGRESS);
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();
@@ -415,7 +426,7 @@ public abstract class AbstractFateStore<T> implements
FateStore<T> {
unreservedRunnableCount.increment();
}
- protected byte[] serializeTxInfo(Serializable so) {
+ protected static byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
@@ -428,7 +439,7 @@ public abstract class AbstractFateStore<T> implements
FateStore<T> {
}
}
- protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
+ protected static Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/StackOverflowException.java
b/core/src/main/java/org/apache/accumulo/core/fate/StackOverflowException.java
index 80956b71c7..e332024d60 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/StackOverflowException.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/StackOverflowException.java
@@ -18,7 +18,7 @@
*/
package org.apache.accumulo.core.fate;
-public class StackOverflowException extends Exception {
+public class StackOverflowException extends RuntimeException {
public StackOverflowException(String msg) {
super(msg);
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index c134db1840..195848e276 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -71,9 +71,8 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
private final String tableName;
private static final FateInstanceType fateInstanceType =
FateInstanceType.USER;
- private static final int maxRepos = 100;
private static final com.google.common.collect.Range<Integer> REPO_RANGE =
- com.google.common.collect.Range.closed(1, maxRepos);
+ com.google.common.collect.Range.closed(1, MAX_REPOS);
public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID
lockID,
Predicate<ZooUtil.LockID> isLockHeld) {
@@ -457,12 +456,12 @@ public class UserFateStore<T> extends
AbstractFateStore<T> {
Optional<Integer> top = findTop();
- if (top.filter(t -> t >= maxRepos).isPresent()) {
+ if (top.filter(t -> t >= MAX_REPOS).isPresent()) {
throw new StackOverflowException("Repo stack size too large");
}
FateMutator<T> fateMutator =
- newMutator(fateId).requireStatus(TStatus.IN_PROGRESS, TStatus.NEW);
+
newMutator(fateId).requireStatus(REQ_PUSH_STATUS.toArray(TStatus[]::new));
fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate();
}
@@ -471,8 +470,8 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
verifyReservedAndNotDeleted(true);
Optional<Integer> top = findTop();
- top.ifPresent(t -> newMutator(fateId)
- .requireStatus(TStatus.FAILED_IN_PROGRESS,
TStatus.SUCCESSFUL).deleteRepo(t).mutate());
+ top.ifPresent(t ->
newMutator(fateId).requireStatus(REQ_POP_STATUS.toArray(TStatus[]::new))
+ .deleteRepo(t).mutate());
}
@Override
@@ -497,7 +496,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
verifyReservedAndNotDeleted(true);
var mutator = newMutator(fateId);
- mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED,
TStatus.SUCCESSFUL, TStatus.FAILED);
+ mutator.requireStatus(REQ_DELETE_STATUS.toArray(TStatus[]::new));
mutator.delete().mutate();
this.deleted = true;
}
@@ -507,9 +506,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
verifyReservedAndNotDeleted(true);
var mutator = newMutator(fateId);
- // allow deletion of all txns other than UNKNOWN
- mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED,
TStatus.SUCCESSFUL, TStatus.FAILED,
- TStatus.FAILED_IN_PROGRESS, TStatus.IN_PROGRESS);
+ mutator.requireStatus(REQ_FORCE_DELETE_STATUS.toArray(TStatus[]::new));
mutator.delete().mutate();
this.deleted = true;
}
@@ -537,14 +534,14 @@ public class UserFateStore<T> extends
AbstractFateStore<T> {
static Text invertRepo(int position) {
Preconditions.checkArgument(REPO_RANGE.contains(position),
- "Position %s is not in the valid range of [0,%s]", position, maxRepos);
- return new Text(String.format("%02d", maxRepos - position));
+ "Position %s is not in the valid range of [0,%s]", position,
MAX_REPOS);
+ return new Text(String.format("%02d", MAX_REPOS - position));
}
static Integer restoreRepo(Text invertedPosition) {
- int position = maxRepos - Integer.parseInt(invertedPosition.toString());
+ int position = MAX_REPOS - Integer.parseInt(invertedPosition.toString());
Preconditions.checkArgument(REPO_RANGE.contains(position),
- "Position %s is not in the valid range of [0,%s]", position, maxRepos);
+ "Position %s is not in the valid range of [0,%s]", position,
MAX_REPOS);
return position;
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
index 28c0904ffa..4a691417c6 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
@@ -29,16 +29,20 @@ import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
+import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Deque;
+import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
@@ -52,7 +56,6 @@ import org.apache.accumulo.core.fate.ReadOnlyRepo;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.StackOverflowException;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -104,7 +107,9 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
while (true) {
try {
FateId fateId = fateIdGenerator.newRandomId(fateInstanceType);
- zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW,
null).serialize(),
+ zk.putPersistentData(getTXPath(fateId),
+ new FateData<T>(TStatus.NEW, null, null, createEmptyRepoDeque(),
createEmptyTxInfo())
+ .serialize(),
NodeExistsPolicy.FAIL);
return fateId;
} catch (NodeExistsException nee) {
@@ -120,40 +125,43 @@ public class MetaFateStore<T> extends
AbstractFateStore<T> {
final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
try {
- byte[] nodeVal = zk.mutateOrCreate(getTXPath(fateId),
- new NodeValue(TStatus.NEW, reservation, fateKey).serialize(),
currSerNodeVal -> {
- // We are only returning a non-null value for the following cases:
- // 1) The existing NodeValue for fateId is exactly the same as the
value set for the
- // node if it doesn't yet exist:
- // TStatus = TStatus.NEW, FateReservation = reservation, FateKey =
fateKey
- // This might occur if there was a ZK server fault and the same
write is running a 2nd
- // time
- // 2) The existing NodeValue for fateId has:
- // TStatus = TStatus.NEW, no FateReservation present, FateKey =
fateKey
- // The fateId is NEW/unseeded and not reserved, so we can allow it
to be reserved
- // Note: returning null here will not change the value to null but
will return null
- NodeValue currNodeVal = new NodeValue(currSerNodeVal);
- if (currNodeVal.status == TStatus.NEW) {
- verifyFateKey(fateId, currNodeVal.fateKey, fateKey);
- if (currNodeVal.isReservedBy(reservation)) {
- return currSerNodeVal;
- } else if (!currNodeVal.isReserved()) {
- // NEW/unseeded transaction and not reserved, so we can allow
it to be reserved
- return new NodeValue(TStatus.NEW, reservation,
fateKey).serialize();
- } else {
- // NEW/unseeded transaction reserved under a different
reservation
- return null;
- }
- } else {
- log.trace(
- "fate id {} tstatus {} fate key {} is reserved {} "
- + "has already been seeded with work (non-NEW status)",
- fateId, currNodeVal.status, currNodeVal.fateKey.orElse(null),
- currNodeVal.isReserved());
- return null;
- }
- });
- if (nodeVal != null) {
+ byte[] newSerFateData =
+ zk.mutateOrCreate(getTXPath(fateId), new FateData<>(TStatus.NEW,
reservation, fateKey,
+ createEmptyRepoDeque(), createEmptyTxInfo()).serialize(),
currSerFateData -> {
+ // We are only returning a non-null value for the following
cases:
+ // 1) The existing node for fateId is exactly the same as the
value set for the
+ // node if it doesn't yet exist:
+ // TStatus = TStatus.NEW, FateReservation = reservation,
FateKey = fateKey
+ // This might occur if there was a ZK server fault and the
same write is running a
+ // 2nd
+ // time
+ // 2) The existing node for fateId has:
+ // TStatus = TStatus.NEW, no FateReservation present, FateKey
= fateKey
+ // The fateId is NEW/unseeded and not reserved, so we can
allow it to be reserved
+ FateData<T> currFateData = new FateData<>(currSerFateData);
+ if (currFateData.status == TStatus.NEW) {
+ verifyFateKey(fateId, currFateData.fateKey, fateKey);
+ if (currFateData.isReservedBy(reservation)) {
+ return currSerFateData;
+ } else if (!currFateData.isReserved()) {
+ // NEW/unseeded transaction and not reserved, so we can
allow it to be reserved
+ return new FateData<>(TStatus.NEW, reservation, fateKey,
createEmptyRepoDeque(),
+ createEmptyTxInfo()).serialize();
+ } else {
+ // NEW/unseeded transaction reserved under a different
reservation
+ // This will not change the value and will return null
+ return null;
+ }
+ } else {
+ log.trace(
+ "fate id {} tstatus {} fate key {} is reserved {} "
+ + "has already been seeded with work (non-NEW
status)",
+ fateId, currFateData.status,
currFateData.fateKey.orElse(null),
+ currFateData.isReserved());
+ return null;
+ }
+ });
+ if (newSerFateData != null) {
return Optional.of(new FateTxStoreImpl(fateId, reservation));
} else {
return Optional.empty();
@@ -214,32 +222,35 @@ public class MetaFateStore<T> extends
AbstractFateStore<T> {
// uniquely identify this attempt to reserve the fate operation data
FateReservation reservation = FateReservation.from(lockID,
UUID.randomUUID());
- try {
- byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId),
currSerNodeVal -> {
- NodeValue currNodeVal = new NodeValue(currSerNodeVal);
- // The uuid handles the case where there was a ZK server fault and the
write for this thread
- // went through but that was not acknowledged, and we are reading our
own write for 2nd
- // time.
- if (!currNodeVal.isReserved() ||
currNodeVal.isReservedBy(reservation)) {
- FateKey currFateKey = currNodeVal.fateKey.orElse(null);
- // Add the FateReservation to the node to reserve
- return new NodeValue(currNodeVal.status, reservation,
currFateKey).serialize();
- } else {
- // This will not change the value to null but will return null
- return null;
- }
- });
- if (newSerNodeVal != null) {
- return Optional.of(new FateTxStoreImpl(fateId, reservation));
+ UnaryOperator<FateData<T>> fateDataOp = currFateData -> {
+ // The uuid handles the case where there was a ZK server fault and the
write for this thread
+ // went through but that was not acknowledged, and we are reading our
own write for 2nd
+ // time.
+ if (!currFateData.isReserved() ||
currFateData.isReservedBy(reservation)) {
+ // Add the FateReservation to the node to reserve
+ return new FateData<>(currFateData.status, reservation,
currFateData.fateKey.orElse(null),
+ currFateData.repoDeque, currFateData.txInfo);
} else {
- return Optional.empty();
+ // This will not change the value and will return null
+ return null;
}
- } catch (KeeperException.NoNodeException e) {
+ };
+
+ byte[] newSerFateData;
+ try {
+ newSerFateData = mutate(fateId, fateDataOp);
+ } catch (KeeperException.NoNodeException nne) {
log.trace("Tried to reserve a transaction {} that does not exist",
fateId);
return Optional.empty();
- } catch (InterruptedException | KeeperException |
AcceptableThriftTableOperationException e) {
+ } catch (KeeperException e) {
throw new IllegalStateException(e);
}
+
+ if (newSerFateData != null) {
+ return Optional.of(new FateTxStoreImpl(fateId, reservation));
+ } else {
+ return Optional.empty();
+ }
}
@Override
@@ -250,25 +261,27 @@ public class MetaFateStore<T> extends
AbstractFateStore<T> {
if (isLockHeld.test(reservation.getLockID())) {
continue;
}
+
+ UnaryOperator<FateData<T>> fateDataOp = currFateData -> {
+ // Make sure the current node is still reserved and reserved with the
expected reservation
+ // and it is dead
+ if (currFateData.isReservedBy(reservation)
+ &&
!isLockHeld.test(currFateData.reservation.orElseThrow().getLockID())) {
+ // Delete the reservation
+ log.trace("Deleted the dead reservation {} for fate id {}",
reservation, fateId);
+ return new FateData<>(currFateData.status, null,
currFateData.fateKey.orElse(null),
+ currFateData.repoDeque, currFateData.txInfo);
+ } else {
+ // This will not change the value and will return null
+ return null;
+ }
+ };
+
try {
- zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> {
- NodeValue currNodeVal = new NodeValue(currSerNodeVal);
- // Make sure the current node is still reserved and reserved with
the expected reservation
- // and it is dead
- if (currNodeVal.isReservedBy(reservation)
- &&
!isLockHeld.test(currNodeVal.reservation.orElseThrow().getLockID())) {
- // Delete the reservation
- log.trace("Deleted the dead reservation {} for fate id {}",
reservation, fateId);
- return new NodeValue(currNodeVal.status, null,
currNodeVal.fateKey.orElse(null))
- .serialize();
- } else {
- // No change
- return null;
- }
- });
- } catch (KeeperException.NoNodeException e) {
+ mutate(fateId, fateDataOp);
+ } catch (KeeperException.NoNodeException nne) {
// the node has since been deleted. Can safely ignore
- } catch (KeeperException | InterruptedException |
AcceptableThriftTableOperationException e) {
+ } catch (KeeperException e) {
throw new RuntimeException(e);
}
}
@@ -294,70 +307,46 @@ public class MetaFateStore<T> extends
AbstractFateStore<T> {
@Override
public Repo<T> top() {
verifyReservedAndNotDeleted(false);
+ String txpath = getTXPath(fateId);
for (int i = 0; i < RETRIES; i++) {
- String txpath = getTXPath(fateId);
- try {
- String top = findTop(txpath);
- if (top == null) {
- return null;
- }
+ FateData<T> fateData = getFateData(fateId);
- byte[] ser = zk.getData(txpath + "/" + top);
- @SuppressWarnings("unchecked")
- var deserialized = (Repo<T>) deserialize(ser);
- return deserialized;
- } catch (KeeperException.NoNodeException ex) {
- log.debug("zookeeper error reading " + txpath + ": " + ex, ex);
+ if (fateData.status == TStatus.UNKNOWN) {
+ log.debug("zookeeper error reading fate data for {} at {}", fateId,
txpath);
sleepUninterruptibly(100, MILLISECONDS);
continue;
- } catch (KeeperException | InterruptedException e) {
- throw new IllegalStateException(e);
}
- }
- return null;
- }
- private String findTop(String txpath) throws KeeperException,
InterruptedException {
- List<String> ops;
- try {
- ops = zk.getChildren(txpath);
- } catch (NoNodeException e) {
- return null;
- }
-
- ops = new ArrayList<>(ops);
-
- String max = "";
-
- for (String child : ops) {
- if (child.startsWith("repo_") && child.compareTo(max) > 0) {
- max = child;
+ var repoDeque = fateData.repoDeque;
+ if (repoDeque.isEmpty()) {
+ return null;
+ } else {
+ return repoDeque.peek();
}
}
-
- if (max.isEmpty()) {
- return null;
- }
-
- return max;
+ return null;
}
@Override
public void push(Repo<T> repo) throws StackOverflowException {
verifyReservedAndNotDeleted(true);
- String txpath = getTXPath(fateId);
- try {
- String top = findTop(txpath);
- if (top != null && Long.parseLong(top.split("_")[1]) > 100) {
+ UnaryOperator<FateData<T>> fateDataOp = currFateData -> {
+ Preconditions.checkState(REQ_PUSH_STATUS.contains(currFateData.status),
+ "Tried to push to the repo stack for %s when the transaction
status is %s", fateId,
+ currFateData.status);
+ var repoDeque = currFateData.repoDeque;
+ if (repoDeque.size() >= MAX_REPOS) {
throw new StackOverflowException("Repo stack size too large");
}
+ repoDeque.push(repo);
+ return currFateData;
+ };
- zk.putPersistentSequential(txpath + "/repo_", serialize(repo));
- } catch (StackOverflowException soe) {
- throw soe;
- } catch (KeeperException | InterruptedException e) {
+ try {
+ mutate(fateId, fateDataOp);
+ } catch (KeeperException e) {
throw new IllegalStateException(e);
}
}
@@ -366,14 +355,24 @@ public class MetaFateStore<T> extends
AbstractFateStore<T> {
public void pop() {
verifyReservedAndNotDeleted(true);
- try {
- String txpath = getTXPath(fateId);
- String top = findTop(txpath);
- if (top == null) {
+ UnaryOperator<FateData<T>> fateDataOp = currFateData -> {
+ Preconditions.checkState(REQ_POP_STATUS.contains(currFateData.status),
+ "Tried to pop from the repo stack for %s when the transaction
status is %s", fateId,
+ currFateData.status);
+ var repoDeque = currFateData.repoDeque;
+
+ if (repoDeque.isEmpty()) {
throw new IllegalStateException("Tried to pop when empty " + fateId);
+ } else {
+ repoDeque.pop();
}
- zk.recursiveDelete(txpath + "/" + top, NodeMissingPolicy.SKIP);
- } catch (KeeperException | InterruptedException e) {
+
+ return currFateData;
+ };
+
+ try {
+ mutate(fateId, fateDataOp);
+ } catch (KeeperException e) {
throw new IllegalStateException(e);
}
}
@@ -382,23 +381,22 @@ public class MetaFateStore<T> extends
AbstractFateStore<T> {
public void setStatus(TStatus status) {
verifyReservedAndNotDeleted(true);
+ UnaryOperator<FateData<T>> fateDataOp = currFateData -> {
+ // Ensure the FateId is reserved in ZK, and it is reserved with the
expected reservation
+ if (currFateData.isReservedBy(this.reservation)) {
+ return new FateData<>(status, currFateData.reservation.orElseThrow(),
+ currFateData.fateKey.orElse(null), currFateData.repoDeque,
currFateData.txInfo);
+ } else {
+ throw new IllegalStateException("Either the FateId " + fateId
+ + " is not reserved in ZK, or it is but the reservation in ZK: "
+ + currFateData.reservation.orElse(null) + " differs from that in
the store: "
+ + this.reservation);
+ }
+ };
+
try {
- zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
- NodeValue currNodeVal = new NodeValue(currSerializedData);
- // Ensure the FateId is reserved in ZK, and it is reserved with the
expected reservation
- if (currNodeVal.isReservedBy(this.reservation)) {
- FateReservation currFateReservation =
currNodeVal.reservation.orElseThrow();
- FateKey currFateKey = currNodeVal.fateKey.orElse(null);
- NodeValue newNodeValue = new NodeValue(status,
currFateReservation, currFateKey);
- return newNodeValue.serialize();
- } else {
- throw new IllegalStateException("Either the FateId " + fateId
- + " is not reserved in ZK, or it is but the reservation in ZK:
"
- + currNodeVal.reservation.orElse(null) + " differs from that
in the store: "
- + this.reservation);
- }
- });
- } catch (KeeperException | InterruptedException |
AcceptableThriftTableOperationException e) {
+ mutate(fateId, fateDataOp);
+ } catch (KeeperException e) {
throw new IllegalStateException(e);
}
@@ -407,30 +405,51 @@ public class MetaFateStore<T> extends
AbstractFateStore<T> {
@Override
public void delete() {
- verifyReservedAndNotDeleted(true);
-
- try {
- zk.recursiveDelete(getTXPath(fateId), NodeMissingPolicy.SKIP);
- this.deleted = true;
- } catch (KeeperException | InterruptedException e) {
- throw new IllegalStateException(e);
- }
+ _delete(REQ_DELETE_STATUS);
}
@Override
public void forceDelete() {
- delete();
+ _delete(REQ_FORCE_DELETE_STATUS);
+ }
+
+ private void _delete(Set<TStatus> requiredStatus) {
+ verifyReservedAndNotDeleted(true);
+
+ // atomically check the txn status and delete the node
+ // retry until we either atomically delete the node or the txn status is
disallowed
+ while (!this.deleted) {
+ Stat stat = new Stat();
+ FateData<T> fateData = getFateData(fateId, stat);
+ Preconditions.checkState(requiredStatus.contains(fateData.status),
+ "Tried to delete fate data for %s when the transaction status is
%s", fateId,
+ fateData.status);
+ try {
+ zk.deleteStrict(getTXPath(fateId), stat.getVersion());
+ this.deleted = true;
+ } catch (KeeperException.BadVersionException e) {
+ log.trace(
+ "Deletion of ZK node fate data for {} was not able to be
completed atomically... Retrying",
+ fateId);
+ } catch (InterruptedException | KeeperException e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
@Override
public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) {
verifyReservedAndNotDeleted(true);
+ UnaryOperator<FateData<T>> fateDataOp = currFateData -> {
+ currFateData.txInfo.put(txInfo, so);
+ return currFateData;
+ };
+
try {
- zk.putPersistentData(getTXPath(fateId) + "/" + txInfo,
serializeTxInfo(so),
- NodeExistsPolicy.OVERWRITE);
- } catch (KeeperException | InterruptedException e2) {
- throw new IllegalStateException(e2);
+ mutate(fateId, fateDataOp);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(e);
}
}
@@ -456,100 +475,74 @@ public class MetaFateStore<T> extends
AbstractFateStore<T> {
@Override
public List<ReadOnlyRepo<T>> getStack() {
verifyReservedAndNotDeleted(false);
- String txpath = getTXPath(fateId);
-
- outer: while (true) {
- List<String> ops;
- try {
- ops = zk.getChildren(txpath);
- } catch (KeeperException.NoNodeException e) {
- return Collections.emptyList();
- } catch (KeeperException | InterruptedException e1) {
- throw new IllegalStateException(e1);
- }
-
- ops = new ArrayList<>(ops);
- ops.sort(Collections.reverseOrder());
-
- ArrayList<ReadOnlyRepo<T>> dops = new ArrayList<>();
-
- for (String child : ops) {
- if (child.startsWith("repo_")) {
- byte[] ser;
- try {
- ser = zk.getData(txpath + "/" + child);
- @SuppressWarnings("unchecked")
- var repo = (ReadOnlyRepo<T>) deserialize(ser);
- dops.add(repo);
- } catch (KeeperException.NoNodeException e) {
- // children changed so start over
- continue outer;
- } catch (KeeperException | InterruptedException e) {
- throw new IllegalStateException(e);
- }
- }
- }
- return dops;
- }
+ FateData<T> fateData = getFateData(fateId);
+ return new ArrayList<>(fateData.repoDeque);
}
@Override
protected void unreserve() {
- try {
- if (!this.deleted) {
- zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> {
- NodeValue currNodeVal = new NodeValue(currSerNodeVal);
- FateKey currFateKey = currNodeVal.fateKey.orElse(null);
- if (currNodeVal.isReservedBy(this.reservation)) {
- // Remove the FateReservation from the NodeValue to unreserve
- return new NodeValue(currNodeVal.status, null,
currFateKey).serialize();
- } else {
- // possible this is running a 2nd time in zk server fault
conditions and its first
- // write went through
- if (!currNodeVal.isReserved()) {
- log.trace("The FATE reservation for fate id {} does not exist
in ZK", fateId);
- } else if
(!currNodeVal.reservation.orElseThrow().equals(this.reservation)) {
- log.debug(
- "The FATE reservation for fate id {} in ZK differs from
that in the store",
- fateId);
- }
- return null;
- }
- });
+ UnaryOperator<FateData<T>> fateDataOp = currFateData -> {
+ if (currFateData.isReservedBy(this.reservation)) {
+ // Remove the FateReservation from the node to unreserve
+ return new FateData<>(currFateData.status, null,
currFateData.fateKey.orElse(null),
+ currFateData.repoDeque, currFateData.txInfo);
+ } else {
+ // possible this is running a 2nd time in zk server fault conditions
and its first
+ // write went through
+ if (!currFateData.isReserved()) {
+ log.trace("The FATE reservation for fate id {} does not exist in
ZK", fateId);
+ } else if
(!currFateData.reservation.orElseThrow().equals(this.reservation)) {
+ log.debug("The FATE reservation for fate id {} in ZK differs from
that in the store",
+ fateId);
+ }
+ // This will not change the value and will return null
+ return null;
+ }
+ };
+
+ if (!this.deleted) {
+ try {
+ mutate(fateId, fateDataOp);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(e);
}
- this.reservation = null;
- } catch (InterruptedException | KeeperException |
AcceptableThriftTableOperationException e) {
- throw new IllegalStateException(e);
}
+ this.reservation = null;
}
}
private Serializable getTransactionInfo(TxInfo txInfo, FateId fateId) {
- try {
- return deserializeTxInfo(txInfo, zk.getData(getTXPath(fateId) + "/" +
txInfo));
- } catch (NoNodeException nne) {
- return null;
- } catch (KeeperException | InterruptedException e) {
- throw new IllegalStateException(e);
- }
+ return getFateData(fateId).txInfo.get(txInfo);
}
@Override
protected TStatus _getStatus(FateId fateId) {
- return getNode(fateId).status;
+ return getFateData(fateId).status;
}
@Override
protected Optional<FateKey> getKey(FateId fateId) {
- return getNode(fateId).fateKey;
+ return getFateData(fateId).fateKey;
}
- private NodeValue getNode(FateId fateId) {
+ private FateData<T> getFateData(FateId fateId) {
try {
- return new NodeValue(zk.getData(getTXPath(fateId)));
+ return new FateData<>(zk.getData(getTXPath(fateId)));
} catch (NoNodeException nne) {
- return new NodeValue(TStatus.UNKNOWN, null);
+ return new FateData<>(TStatus.UNKNOWN, null, null,
createEmptyRepoDeque(),
+ createEmptyTxInfo());
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private FateData<T> getFateData(FateId fateId, Stat stat) {
+ try {
+ return new FateData<>(zk.getData(getTXPath(fateId), stat));
+ } catch (NoNodeException nne) {
+ return new FateData<>(TStatus.UNKNOWN, null, null,
createEmptyRepoDeque(),
+ createEmptyTxInfo());
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException(e);
}
@@ -569,7 +562,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
// Memoizing for two reasons. First the status or reservation may
never be requested, so
// in that case avoid the lookup. Second, if it's requested multiple
times the result will
// always be consistent.
- Supplier<NodeValue> nodeSupplier = Suppliers.memoize(() ->
getNode(fateId));
+ Supplier<FateData<T>> nodeSupplier = Suppliers.memoize(() ->
getFateData(fateId));
return new FateIdStatusBase(fateId) {
@Override
public TStatus getStatus() {
@@ -599,30 +592,72 @@ public class MetaFateStore<T> extends
AbstractFateStore<T> {
.filter(fateKey -> fateKey.getType() == type);
}
- protected static class NodeValue {
+ private Deque<Repo<T>> createEmptyRepoDeque() {
+ return new ArrayDeque<>();
+ }
+
+ private Map<TxInfo,Serializable> createEmptyTxInfo() {
+ return new EnumMap<>(TxInfo.class);
+ }
+
+ /**
+ * Mutate the existing FateData for the given fateId using the given
operator.
+ *
+ * @param fateId the fateId for the FateData to change
+ * @param fateDataOp the operation to apply to the existing FateData. Op
should return null if no
+ * change is desired. Otherwise, should return the new FateData with
the desired changes
+ * @return the resulting serialized FateData or null if the op resulted in
no change
+ */
+ private byte[] mutate(FateId fateId, UnaryOperator<FateData<T>> fateDataOp)
+ throws KeeperException {
+ try {
+ return zk.mutateExisting(getTXPath(fateId), currSerFateData -> {
+ FateData<T> currFateData = new FateData<>(currSerFateData);
+ FateData<T> newFateData = fateDataOp.apply(currFateData);
+ if (newFateData == null) {
+ // This will not change the value and will return null
+ return null;
+ } else {
+ return newFateData.serialize();
+ }
+ });
+ } catch (InterruptedException | AcceptableThriftTableOperationException e)
{
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected static class FateData<T> {
final TStatus status;
final Optional<FateKey> fateKey;
final Optional<FateReservation> reservation;
-
- private NodeValue(byte[] serializedData) {
+ final Deque<Repo<T>> repoDeque;
+ final Map<TxInfo,Serializable> txInfo;
+
+ /**
+ * Construct a FateData from a previously {@link #serialize()}ed FateData
+ *
+ * @param serializedData the serialized data
+ */
+ private FateData(byte[] serializedData) {
try (DataInputBuffer buffer = new DataInputBuffer()) {
buffer.reset(serializedData, serializedData.length);
this.status = TStatus.valueOf(buffer.readUTF());
this.reservation = deserializeFateReservation(buffer);
this.fateKey = deserializeFateKey(buffer);
+ this.repoDeque = deserializeRepoDeque(buffer);
+ this.txInfo = deserializeTxInfo(buffer);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
- private NodeValue(TStatus status, FateReservation reservation) {
- this(status, reservation, null);
- }
-
- private NodeValue(TStatus status, FateReservation reservation, FateKey
fateKey) {
+ private FateData(TStatus status, FateReservation reservation, FateKey
fateKey,
+ Deque<Repo<T>> repoDeque, Map<TxInfo,Serializable> txInfo) {
this.status = Objects.requireNonNull(status);
this.reservation = Optional.ofNullable(reservation);
this.fateKey = Optional.ofNullable(fateKey);
+ this.repoDeque = Objects.requireNonNull(repoDeque);
+ this.txInfo = Objects.requireNonNull(txInfo);
}
private Optional<FateKey> deserializeFateKey(DataInputBuffer buffer)
throws IOException {
@@ -644,10 +679,46 @@ public class MetaFateStore<T> extends
AbstractFateStore<T> {
return Optional.empty();
}
- byte[] serialize() {
+ private Deque<Repo<T>> deserializeRepoDeque(DataInputBuffer buffer) throws
IOException {
+ Deque<Repo<T>> deque = new ArrayDeque<>();
+ int numRepos = buffer.readInt();
+
+ for (int i = 0; i < numRepos; i++) {
+ int length = buffer.readInt();
+ Preconditions.checkArgument(length > 0);
+ @SuppressWarnings("unchecked")
+ var repo = (Repo<T>) deserialize(buffer.readNBytes(length));
+ deque.add(repo);
+ }
+
+ return deque;
+ }
+
+ private Map<TxInfo,Serializable> deserializeTxInfo(DataInputBuffer buffer)
throws IOException {
+ Map<TxInfo,Serializable> txInfo = new EnumMap<>(TxInfo.class);
+ int length = buffer.readInt();
+
+ while (length != 0) {
+ Preconditions.checkArgument(length >= 0);
+ TxInfo type = TxInfo.values()[buffer.readInt()];
+ txInfo.put(type, AbstractFateStore.deserializeTxInfo(type,
buffer.readNBytes(length - 1)));
+
+ // if we have reached the end of the buffer (= reached the end of the
tx info data)
+ if (buffer.getPosition() == buffer.getLength()) {
+ break;
+ }
+ length = buffer.readInt();
+ }
+
+ return txInfo;
+ }
+
+ private byte[] serialize() {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
+ // status
dos.writeUTF(status.name());
+ // reservation
if (isReserved()) {
byte[] serializedFateReservation =
reservation.orElseThrow().getSerialized();
dos.writeInt(serializedFateReservation.length);
@@ -655,6 +726,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
} else {
dos.writeInt(0);
}
+ // fate key
if (fateKey.isPresent()) {
byte[] serializedFateKey = fateKey.orElseThrow().getSerialized();
dos.writeInt(serializedFateKey.length);
@@ -662,6 +734,27 @@ public class MetaFateStore<T> extends AbstractFateStore<T>
{
} else {
dos.writeInt(0);
}
+ // repo deque
+ byte[] serializedRepo;
+ dos.writeInt(repoDeque.size());
+ // iterates from top/first/head to bottom/last/tail
+ for (Repo<T> repo : repoDeque) {
+ serializedRepo = AbstractFateStore.serialize(repo);
+ dos.writeInt(serializedRepo.length);
+ dos.write(serializedRepo);
+ }
+ // tx info
+ if (!txInfo.isEmpty()) {
+ for (var elt : txInfo.entrySet()) {
+ byte[] serTxInfo = serializeTxInfo(elt.getValue());
+ dos.writeInt(1 + serTxInfo.length);
+ dos.writeInt(elt.getKey().ordinal());
+ dos.write(serTxInfo);
+ }
+ } else {
+ dos.writeInt(0);
+ }
+ // done
dos.close();
return baos.toByteArray();
} catch (IOException e) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FateStatusEnforcementIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateStatusEnforcementIT.java
new file mode 100644
index 0000000000..5ca1a08b8d
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/fate/FateStatusEnforcementIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Set;
+
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+public abstract class FateStatusEnforcementIT extends SharedMiniClusterBase {
+
+ protected FateId fateId;
+ protected FateStore<FateTestRunner.TestEnv> store;
+ protected FateStore.FateTxStore<FateIT.TestEnv> txStore;
+
+ @Test
+ public void push() throws Exception {
+ testOperationWithStatuses(() -> {}, // No special setup needed for push
+ () -> txStore.push(new FateIT.TestRepo("testOp")),
AbstractFateStore.REQ_PUSH_STATUS);
+ }
+
+ @Test
+ public void pop() throws Exception {
+ testOperationWithStatuses(() -> {
+ // Setup for pop: Ensure there something to pop by first pushing
+ try {
+ txStore.setStatus(ReadOnlyFateStore.TStatus.NEW);
+ txStore.push(new FateIT.TestRepo("testOp"));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to setup for pop", e);
+ }
+ }, txStore::pop, AbstractFateStore.REQ_POP_STATUS);
+ }
+
+ @Test
+ public void delete() throws Exception {
+ testOperationWithStatuses(() -> {
+ // Setup for delete: Create a new txStore before each delete since
delete cannot be called
+ // on the same txStore more than once
+ fateId = store.create();
+ txStore = store.reserve(fateId);
+ }, () -> txStore.delete(), AbstractFateStore.REQ_DELETE_STATUS);
+ }
+
+ @Test
+ public void forceDelete() throws Exception {
+ testOperationWithStatuses(() -> {
+ // Setup for forceDelete: same as delete
+ fateId = store.create();
+ txStore = store.reserve(fateId);
+ }, () -> txStore.forceDelete(), AbstractFateStore.REQ_FORCE_DELETE_STATUS);
+ }
+
+ protected void testOperationWithStatuses(Runnable beforeOperation,
Executable operation,
+ Set<ReadOnlyFateStore.TStatus> acceptableStatuses) throws Exception {
+ for (ReadOnlyFateStore.TStatus status :
ReadOnlyFateStore.TStatus.values()) {
+ // Run any needed setup for the operation before each iteration
+ beforeOperation.run();
+
+ txStore.setStatus(status);
+ var fateIdStatus = store.list().filter(statusEntry ->
statusEntry.getFateId().equals(fateId))
+ .findFirst().orElseThrow();
+ assertEquals(status, fateIdStatus.getStatus());
+ if (!acceptableStatuses.contains(status)) {
+ assertThrows(IllegalStateException.class, operation,
+ "Expected operation to fail with status " + status + " but it did
not");
+ } else {
+ assertDoesNotThrow(operation,
+ "Expected operation to succeed with status " + status + " but it
did not");
+ }
+ }
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java
new file mode 100644
index 0000000000..856fd11f26
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import static
org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.TabletInformation;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.metadata.AccumuloTable;
+import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.io.TempDir;
+
+import com.google.common.collect.MoreCollectors;
+
+/**
+ * A class with utility methods for testing UserFateStore and MetaFateStore
+ */
+public class FateStoreUtil {
+ /**
+ * Create the fate table with the exact configuration as the real Fate user
instance table
+ * including table properties and TabletAvailability. For use in testing
UserFateStore
+ */
+ public static void createFateTable(ClientContext client, String table)
throws Exception {
+ final var fateTableProps =
+
client.tableOperations().getTableProperties(AccumuloTable.FATE.tableName());
+
+ TabletAvailability availability;
+ try (var tabletStream = client.tableOperations()
+ .getTabletInformation(AccumuloTable.FATE.tableName(), new Range())) {
+ availability =
tabletStream.map(TabletInformation::getTabletAvailability).distinct()
+ .collect(MoreCollectors.onlyElement());
+ }
+
+ var newTableConf = new
NewTableConfiguration().withInitialTabletAvailability(availability)
+ .withoutDefaultIterators().setProperties(fateTableProps);
+ client.tableOperations().create(table, newTableConf);
+ var testFateTableProps =
client.tableOperations().getTableProperties(table);
+
+ // ensure that create did not set any other props
+ assertEquals(fateTableProps, testFateTableProps);
+ }
+
+ /**
+ * Contains the necessary utilities for setting up (and shutting down) a
ZooKeeper instance for
+ * use in testing MetaFateStore
+ */
+ @Tag(ZOOKEEPER_TESTING_SERVER)
+ public static class MetaFateZKSetup {
+ private static ZooKeeperTestingServer szk;
+ private static ZooReaderWriter zk;
+ private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID();
+ private static String ZK_FATE_PATH;
+
+ /**
+ * Sets up the ZooKeeper instance and creates the paths needed for testing
MetaFateStore
+ */
+ public static void setup(@TempDir File tempDir) throws Exception {
+ szk = new ZooKeeperTestingServer(tempDir);
+ zk = szk.getZooReaderWriter();
+ ZK_FATE_PATH = ZK_ROOT + Constants.ZFATE;
+ zk.mkdirs(ZK_FATE_PATH);
+ zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS);
+ }
+
+ /**
+ * Tears down the ZooKeeper instance
+ */
+ public static void teardown() throws Exception {
+ szk.close();
+ }
+
+ public static String getZkRoot() {
+ return ZK_ROOT;
+ }
+
+ public static ZooReaderWriter getZooReaderWriter() {
+ return zk;
+ }
+
+ public static String getZkFatePath() {
+ return ZK_FATE_PATH;
+ }
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
index a96dc11e71..ded495b958 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
@@ -19,7 +19,6 @@
package org.apache.accumulo.test.fate.meta;
import static
org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID;
-import static
org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
@@ -27,59 +26,48 @@ import static org.easymock.EasyMock.replay;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.util.UUID;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.fate.FateIT;
-import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
+import org.apache.accumulo.test.fate.FateStoreUtil;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.zookeeper.KeeperException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.io.TempDir;
-@Tag(ZOOKEEPER_TESTING_SERVER)
public class MetaFateIT extends FateIT {
-
- private static ZooKeeperTestingServer szk = null;
- private static ZooReaderWriter zk = null;
- private static final InstanceId IID = InstanceId.of(UUID.randomUUID());
- private static final String ZK_ROOT = ZooUtil.getRoot(IID);
-
@TempDir
private static File tempDir;
@BeforeAll
public static void setup() throws Exception {
- szk = new ZooKeeperTestingServer(tempDir);
- zk = szk.getZooReaderWriter();
- zk.mkdirs(ZK_ROOT + Constants.ZFATE);
- zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS);
+ FateStoreUtil.MetaFateZKSetup.setup(tempDir);
}
@AfterAll
public static void teardown() throws Exception {
- szk.close();
+ FateStoreUtil.MetaFateZKSetup.teardown();
}
@Override
public void executeTest(FateTestExecutor<TestEnv> testMethod, int
maxDeferred,
FateIdGenerator fateIdGenerator) throws Exception {
+ String zkRoot = FateStoreUtil.MetaFateZKSetup.getZkRoot();
+ var zooReaderWriter = FateStoreUtil.MetaFateZKSetup.getZooReaderWriter();
+ String fatePath = FateStoreUtil.MetaFateZKSetup.getZkFatePath();
ServerContext sctx = createMock(ServerContext.class);
- expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
- expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
+ expect(sctx.getZooKeeperRoot()).andReturn(zkRoot).anyTimes();
+ expect(sctx.getZooReaderWriter()).andReturn(zooReaderWriter).anyTimes();
replay(sctx);
- testMethod.execute(new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk,
createDummyLockID(), null,
+ testMethod.execute(new MetaFateStore<>(fatePath, zooReaderWriter,
createDummyLockID(), null,
maxDeferred, fateIdGenerator), sctx);
}
@@ -98,8 +86,9 @@ public class MetaFateIT extends FateIT {
*/
private static TStatus getTxStatus(ZooReaderWriter zrw, FateId fateId)
throws KeeperException, InterruptedException {
- zrw.sync(ZK_ROOT);
- String txdir = String.format("%s%s/tx_%s", ZK_ROOT, Constants.ZFATE,
fateId.getTxUUIDStr());
+ String zkRoot = FateStoreUtil.MetaFateZKSetup.getZkRoot();
+ zrw.sync(zkRoot);
+ String txdir = String.format("%s%s/tx_%s", zkRoot, Constants.ZFATE,
fateId.getTxUUIDStr());
try (DataInputBuffer buffer = new DataInputBuffer()) {
var serialized = zrw.getData(txdir);
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java
new file mode 100644
index 0000000000..4800e6816e
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.meta;
+
+import java.io.File;
+
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
+import org.apache.accumulo.test.fate.FateStatusEnforcementIT;
+import org.apache.accumulo.test.fate.FateStoreUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MetaFateStatusEnforcementIT extends FateStatusEnforcementIT {
+ @TempDir
+ private static File tempDir;
+
+ @BeforeAll
+ public static void beforeAllSetup() throws Exception {
+ FateStoreUtil.MetaFateZKSetup.setup(tempDir);
+ }
+
+ @AfterAll
+ public static void afterAllTeardown() throws Exception {
+ FateStoreUtil.MetaFateZKSetup.teardown();
+ }
+
+ @BeforeEach
+ public void beforeEachSetup() throws Exception {
+ store = new MetaFateStore<>(FateStoreUtil.MetaFateZKSetup.getZkFatePath(),
+ FateStoreUtil.MetaFateZKSetup.getZooReaderWriter(),
AbstractFateStore.createDummyLockID(),
+ null);
+ fateId = store.create();
+ txStore = store.reserve(fateId);
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
index fb9e9c7d75..66d01b1489 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
@@ -26,25 +26,26 @@ import static org.easymock.EasyMock.replay;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.File;
+import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.util.Deque;
+import java.util.Map;
import java.util.Optional;
-import java.util.UUID;
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
+import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
-import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.fate.FateStoreIT;
-import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
+import org.apache.accumulo.test.fate.FateStoreUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
@@ -52,37 +53,31 @@ import org.junit.jupiter.api.io.TempDir;
@Tag(ZOOKEEPER_TESTING_SERVER)
public class MetaFateStoreFateIT extends FateStoreIT {
-
- private static ZooKeeperTestingServer szk = null;
- private static ZooReaderWriter zk = null;
- private static final InstanceId IID = InstanceId.of(UUID.randomUUID());
- private static final String ZK_ROOT = ZooUtil.getRoot(IID);
-
@TempDir
private static File tempDir;
@BeforeAll
public static void setup() throws Exception {
- szk = new ZooKeeperTestingServer(tempDir);
- zk = szk.getZooReaderWriter();
- zk.mkdirs(ZK_ROOT + Constants.ZFATE);
- zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS);
+ FateStoreUtil.MetaFateZKSetup.setup(tempDir);
}
@AfterAll
public static void teardown() throws Exception {
- szk.close();
+ FateStoreUtil.MetaFateZKSetup.teardown();
}
@Override
public void executeTest(FateTestExecutor<TestEnv> testMethod, int
maxDeferred,
FateIdGenerator fateIdGenerator) throws Exception {
ServerContext sctx = createMock(ServerContext.class);
- expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
- expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
+
expect(sctx.getZooKeeperRoot()).andReturn(FateStoreUtil.MetaFateZKSetup.getZkRoot()).anyTimes();
+
expect(sctx.getZooReaderWriter()).andReturn(FateStoreUtil.MetaFateZKSetup.getZooReaderWriter())
+ .anyTimes();
replay(sctx);
- MetaFateStore<TestEnv> store = new MetaFateStore<>(ZK_ROOT +
Constants.ZFATE, zk,
- createDummyLockID(), null, maxDeferred, fateIdGenerator);
+ MetaFateStore<TestEnv> store =
+ new MetaFateStore<>(FateStoreUtil.MetaFateZKSetup.getZkFatePath(),
+ FateStoreUtil.MetaFateZKSetup.getZooReaderWriter(),
createDummyLockID(), null,
+ maxDeferred, fateIdGenerator);
// Check that the store has no transactions before and after each test
assertEquals(0, store.list().count());
@@ -93,38 +88,49 @@ public class MetaFateStoreFateIT extends FateStoreIT {
@Override
protected void deleteKey(FateId fateId, ServerContext sctx) {
try {
- // We have to use reflection since the NodeValue is internal to the store
-
- // Grab both the constructors that use the serialized bytes and status,
reservation
- Class<?> nodeClass = Class.forName(MetaFateStore.class.getName() +
"$NodeValue");
- Constructor<?> statusReservationCons =
- nodeClass.getDeclaredConstructor(TStatus.class,
FateStore.FateReservation.class);
- Constructor<?> serializedCons =
nodeClass.getDeclaredConstructor(byte[].class);
- statusReservationCons.setAccessible(true);
+ // We have to use reflection since the FateData is internal to the store
+
+ Class<?> fateDataClass = Class.forName(MetaFateStore.class.getName() +
"$FateData");
+ // Constructor for constructing FateData
+ Constructor<?> fateDataCons =
fateDataClass.getDeclaredConstructor(TStatus.class,
+ FateStore.FateReservation.class, FateKey.class, Deque.class,
Map.class);
+ // Constructor for constructing FateData from a byte array (the
serialized form of FateData)
+ Constructor<?> serializedCons =
fateDataClass.getDeclaredConstructor(byte[].class);
+ fateDataCons.setAccessible(true);
serializedCons.setAccessible(true);
- // Get the status and reservation fields so they can be read and get the
serialize method
- Field nodeStatus = nodeClass.getDeclaredField("status");
- Field nodeReservation = nodeClass.getDeclaredField("reservation");
- Method nodeSerialize = nodeClass.getDeclaredMethod("serialize");
- nodeStatus.setAccessible(true);
- nodeReservation.setAccessible(true);
+ // Get the status, reservation, repoDeque, txInfo fields so that they
can be read and get the
+ // serialize method
+ Field status = fateDataClass.getDeclaredField("status");
+ Field reservation = fateDataClass.getDeclaredField("reservation");
+ Field repoDeque = fateDataClass.getDeclaredField("repoDeque");
+ Field txInfo = fateDataClass.getDeclaredField("txInfo");
+ Method nodeSerialize = fateDataClass.getDeclaredMethod("serialize");
+ status.setAccessible(true);
+ reservation.setAccessible(true);
+ repoDeque.setAccessible(true);
+ txInfo.setAccessible(true);
nodeSerialize.setAccessible(true);
- // Get the existing status and reservation for the node and build a new
node with an empty key
- // but uses the existing tid
- String txPath = ZK_ROOT + Constants.ZFATE + "/tx_" +
fateId.getTxUUIDStr();
- Object currentNode = serializedCons.newInstance(new Object[]
{zk.getData(txPath)});
- TStatus currentStatus = (TStatus) nodeStatus.get(currentNode);
+ // Gather the existing fields, create a new FateData object with those
existing fields
+ // (excluding the FateKey in the new object), and replace the zk node
with this new FateData
+ String txPath =
+ FateStoreUtil.MetaFateZKSetup.getZkFatePath() + "/tx_" +
fateId.getTxUUIDStr();
+ Object currentNode = serializedCons.newInstance(
+ new Object[]
{FateStoreUtil.MetaFateZKSetup.getZooReaderWriter().getData(txPath)});
+ TStatus currentStatus = (TStatus) status.get(currentNode);
Optional<FateStore.FateReservation> currentReservation =
- getCurrentReservation(nodeReservation, currentNode);
- // replace the node with no key and just a tid and existing status and
reservation
- Object newNode =
- statusReservationCons.newInstance(currentStatus,
currentReservation.orElse(null));
-
- // Replace the transaction with the same status and reservation but no
key
- zk.putPersistentData(txPath, (byte[]) nodeSerialize.invoke(newNode),
- NodeExistsPolicy.OVERWRITE);
+ getCurrentReservation(reservation, currentNode);
+ @SuppressWarnings("unchecked")
+ Deque<Repo<TestEnv>> currentRepoDeque = (Deque<Repo<TestEnv>>)
repoDeque.get(currentNode);
+ @SuppressWarnings("unchecked")
+ Map<Fate.TxInfo,Serializable> currentTxInfo =
+ (Map<Fate.TxInfo,Serializable>) txInfo.get(currentNode);
+ Object newNode = fateDataCons.newInstance(currentStatus,
currentReservation.orElse(null),
+ null, currentRepoDeque, currentTxInfo);
+
+
FateStoreUtil.MetaFateZKSetup.getZooReaderWriter().putPersistentData(txPath,
+ (byte[]) nodeSerialize.invoke(newNode), NodeExistsPolicy.OVERWRITE);
} catch (Exception e) {
throw new IllegalStateException(e);
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaMultipleStoresIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaMultipleStoresIT.java
index a2866cb900..d5d1903493 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaMultipleStoresIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaMultipleStoresIT.java
@@ -21,13 +21,11 @@ package org.apache.accumulo.test.fate.meta;
import java.io.File;
import java.util.function.Predicate;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
-import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.test.fate.FateStoreUtil;
import org.apache.accumulo.test.fate.MultipleStoresIT;
-import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
import org.apache.zookeeper.KeeperException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -35,21 +33,16 @@ import org.junit.jupiter.api.io.TempDir;
public class MetaMultipleStoresIT extends MultipleStoresIT {
@TempDir
- private static File TEMP_DIR;
- private static ZooKeeperTestingServer SZK;
- private static ZooReaderWriter ZK;
- private static String FATE_DIR;
+ private static File tempDir;
@BeforeAll
public static void setup() throws Exception {
- SZK = new ZooKeeperTestingServer(TEMP_DIR);
- ZK = SZK.getZooReaderWriter();
- FATE_DIR = Constants.ZFATE;
+ FateStoreUtil.MetaFateZKSetup.setup(tempDir);
}
@AfterAll
public static void teardown() throws Exception {
- SZK.close();
+ FateStoreUtil.MetaFateZKSetup.teardown();
}
@Override
@@ -68,7 +61,8 @@ public class MetaMultipleStoresIT extends MultipleStoresIT {
@Override
public FateStore<SleepingTestEnv> create(ZooUtil.LockID lockID,
Predicate<ZooUtil.LockID> isLockHeld) throws InterruptedException,
KeeperException {
- return new MetaFateStore<>(FATE_DIR, ZK, lockID, isLockHeld);
+ return new MetaFateStore<>(FateStoreUtil.MetaFateZKSetup.getZkFatePath(),
+ FateStoreUtil.MetaFateZKSetup.getZooReaderWriter(), lockID,
isLockHeld);
}
}
@@ -76,7 +70,8 @@ public class MetaMultipleStoresIT extends MultipleStoresIT {
@Override
public FateStore<LatchTestEnv> create(ZooUtil.LockID lockID,
Predicate<ZooUtil.LockID> isLockHeld) throws InterruptedException,
KeeperException {
- return new MetaFateStore<>(FATE_DIR, ZK, lockID, isLockHeld);
+ return new MetaFateStore<>(FateStoreUtil.MetaFateZKSetup.getZkFatePath(),
+ FateStoreUtil.MetaFateZKSetup.getZooReaderWriter(), lockID,
isLockHeld);
}
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java
index 7f0383e6f4..014b6c97bc 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java
@@ -19,26 +19,33 @@
package org.apache.accumulo.test.fate.user;
import static
org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID;
-import static
org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable;
+import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.stream.StreamSupport;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
+import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.fate.FateIT;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
public class UserFateIT extends FateIT {
@@ -66,6 +73,53 @@ public class UserFateIT extends FateIT {
}
}
+ // UserFateStore only test:
+ // Test that configs related to the correctness of the FATE instance user
table
+ // are initialized correctly
+ @Test
+ public void testFateInitialConfigCorrectness() throws Exception {
+ try (ClientContext client =
+ (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+
+ // It is important here to use getTableProperties() and not
getConfiguration()
+ // because we want only the table properties and not a merged view
+ var fateTableProps =
+
client.tableOperations().getTableProperties(AccumuloTable.FATE.tableName());
+
+ // Verify properties all have a table. prefix
+ assertTrue(fateTableProps.keySet().stream().allMatch(key ->
key.startsWith("table.")));
+
+ // Verify properties are correctly set
+ assertEquals("5",
fateTableProps.get(Property.TABLE_FILE_REPLICATION.getKey()));
+ assertEquals("sync",
fateTableProps.get(Property.TABLE_DURABILITY.getKey()));
+ assertEquals("false",
fateTableProps.get(Property.TABLE_FAILURES_IGNORE.getKey()));
+ assertEquals("",
fateTableProps.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey()));
+
+ // Verify VersioningIterator related properties are correct
+ var iterClass = "10," + VersioningIterator.class.getName();
+ var maxVersions = "1";
+ assertEquals(iterClass,
+ fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"scan.vers"));
+ assertEquals(maxVersions, fateTableProps
+ .get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"scan.vers.opt.maxVersions"));
+ assertEquals(iterClass,
+ fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"minc.vers"));
+ assertEquals(maxVersions, fateTableProps
+ .get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"minc.vers.opt.maxVersions"));
+ assertEquals(iterClass,
+ fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"majc.vers"));
+ assertEquals(maxVersions, fateTableProps
+ .get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"majc.vers.opt.maxVersions"));
+
+ // Verify all tablets are HOSTED
+ try (var tablets =
+
client.getAmple().readTablets().forTable(AccumuloTable.FATE.tableId()).build())
{
+ assertTrue(tablets.stream()
+ .allMatch(tm -> tm.getTabletAvailability() ==
TabletAvailability.HOSTED));
+ }
+ }
+ }
+
@Override
protected TStatus getTxStatus(ServerContext context, FateId fateId) {
try (Scanner scanner = context.createScanner(table, Authorizations.EMPTY))
{
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java
index 83a87db975..de30125e5d 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java
@@ -19,7 +19,7 @@
package org.apache.accumulo.test.fate.user;
import static
org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID;
-import static
org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable;
+import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.clientImpl.ClientContext;
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java
new file mode 100644
index 0000000000..22ecb9fe6e
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.user;
+
+import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.fate.FateStatusEnforcementIT;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+
+public class UserFateStatusEnforcementIT extends FateStatusEnforcementIT {
+ private ClientContext client;
+ private String table;
+
+ @BeforeAll
+ public static void beforeAllSetup() throws Exception {
+ SharedMiniClusterBase.startMiniCluster();
+ }
+
+ @AfterAll
+ public static void afterAllTeardown() {
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ @BeforeEach
+ public void beforeEachSetup() throws Exception {
+ client = (ClientContext)
Accumulo.newClient().from(getClientProps()).build();
+ table = getUniqueNames(1)[0];
+ createFateTable(client, table);
+ store = new UserFateStore<>(client, table,
AbstractFateStore.createDummyLockID(), null);
+ fateId = store.create();
+ txStore = store.reserve(fateId);
+ }
+
+ @AfterEach
+ public void afterEachTeardown() {
+ client.close();
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java
index c967fbea5a..17cc06a5ac 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java
@@ -20,7 +20,7 @@ package org.apache.accumulo.test.fate.user;
import static
org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID;
import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId;
-import static
org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable;
+import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.BatchWriter;
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java
deleted file mode 100644
index ead901b7ed..0000000000
--- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.test.fate.user;
-
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.EnumSet;
-
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.NewTableConfiguration;
-import org.apache.accumulo.core.client.admin.TabletAvailability;
-import org.apache.accumulo.core.client.admin.TabletInformation;
-import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.fate.AbstractFateStore;
-import org.apache.accumulo.core.fate.FateId;
-import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
-import org.apache.accumulo.core.fate.user.UserFateStore;
-import org.apache.accumulo.core.fate.user.schema.FateSchema;
-import org.apache.accumulo.core.iterators.user.VersioningIterator;
-import org.apache.accumulo.core.metadata.AccumuloTable;
-import org.apache.accumulo.harness.SharedMiniClusterBase;
-import org.apache.accumulo.test.fate.FateIT;
-import org.apache.accumulo.test.fate.FateTestRunner.TestEnv;
-import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.function.Executable;
-
-import com.google.common.collect.MoreCollectors;
-
-public class UserFateStoreIT extends SharedMiniClusterBase {
-
- @BeforeAll
- public static void setup() throws Exception {
- SharedMiniClusterBase.startMiniCluster();
- }
-
- @AfterAll
- public static void teardown() {
- SharedMiniClusterBase.stopMiniCluster();
- }
-
- // Test that configs related to the correctness of the FATE instance user
table
- // are initialized correctly
- @Test
- public void testFateInitialConfigCorrectness() throws Exception {
- try (ClientContext client =
- (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
-
- // It is important here to use getTableProperties() and not
getConfiguration()
- // because we want only the table properties and not a merged view
- var fateTableProps =
-
client.tableOperations().getTableProperties(AccumuloTable.FATE.tableName());
-
- // Verify properties all have a table. prefix
- assertTrue(fateTableProps.keySet().stream().allMatch(key ->
key.startsWith("table.")));
-
- // Verify properties are correctly set
- assertEquals("5",
fateTableProps.get(Property.TABLE_FILE_REPLICATION.getKey()));
- assertEquals("sync",
fateTableProps.get(Property.TABLE_DURABILITY.getKey()));
- assertEquals("false",
fateTableProps.get(Property.TABLE_FAILURES_IGNORE.getKey()));
- assertEquals("",
fateTableProps.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey()));
-
- // Verify VersioningIterator related properties are correct
- var iterClass = "10," + VersioningIterator.class.getName();
- var maxVersions = "1";
- assertEquals(iterClass,
- fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"scan.vers"));
- assertEquals(maxVersions, fateTableProps
- .get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"scan.vers.opt.maxVersions"));
- assertEquals(iterClass,
- fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"minc.vers"));
- assertEquals(maxVersions, fateTableProps
- .get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"minc.vers.opt.maxVersions"));
- assertEquals(iterClass,
- fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"majc.vers"));
- assertEquals(maxVersions, fateTableProps
- .get(Property.TABLE_ITERATOR_PREFIX.getKey() +
"majc.vers.opt.maxVersions"));
-
- // Verify all tablets are HOSTED
- try (var tablets =
-
client.getAmple().readTablets().forTable(AccumuloTable.FATE.tableId()).build())
{
- assertTrue(tablets.stream()
- .allMatch(tm -> tm.getTabletAvailability() ==
TabletAvailability.HOSTED));
- }
- }
- }
-
- @Nested
- class TestStatusEnforcement {
-
- String tableName;
- ClientContext client;
- FateId fateId;
- UserFateStore<TestEnv> store;
- FateStore.FateTxStore<FateIT.TestEnv> txStore;
-
- @BeforeEach
- public void setup() throws Exception {
- client = (ClientContext)
Accumulo.newClient().from(getClientProps()).build();
- tableName = getUniqueNames(1)[0];
- createFateTable(client, tableName);
- store = new UserFateStore<>(client, tableName,
AbstractFateStore.createDummyLockID(), null);
- fateId = store.create();
- txStore = store.reserve(fateId);
- }
-
- @AfterEach
- public void teardown() throws Exception {
- client.close();
- }
-
- private void testOperationWithStatuses(Runnable beforeOperation,
Executable operation,
- EnumSet<TStatus> acceptableStatuses) throws Exception {
- for (TStatus status : TStatus.values()) {
- // Run any needed setup for the operation before each iteration
- beforeOperation.run();
-
- injectStatus(client, tableName, fateId, status);
- var fateIdStatus =
- store.list().filter(statusEntry ->
statusEntry.getFateId().equals(fateId)).findFirst()
- .orElseThrow();
- assertEquals(status, fateIdStatus.getStatus());
- if (!acceptableStatuses.contains(status)) {
- assertThrows(IllegalStateException.class, operation,
- "Expected operation to fail with status " + status + " but it
did not");
- } else {
- assertDoesNotThrow(operation,
- "Expected operation to succeed with status " + status + " but it
did not");
- }
- }
- }
-
- @Test
- public void push() throws Exception {
- testOperationWithStatuses(() -> {}, // No special setup needed for push
- () -> txStore.push(new FateIT.TestRepo("testOp")),
- EnumSet.of(TStatus.IN_PROGRESS, TStatus.NEW));
- }
-
- @Test
- public void pop() throws Exception {
- testOperationWithStatuses(() -> {
- // Setup for pop: Ensure there something to pop by first pushing
- try {
- injectStatus(client, tableName, fateId, TStatus.NEW);
- txStore.push(new FateIT.TestRepo("testOp"));
- } catch (Exception e) {
- throw new RuntimeException("Failed to setup for pop", e);
- }
- }, txStore::pop, EnumSet.of(TStatus.FAILED_IN_PROGRESS,
TStatus.SUCCESSFUL));
- }
-
- @Test
- public void delete() throws Exception {
- testOperationWithStatuses(() -> {
- // Setup for delete: Create a new txStore before each delete since
delete cannot be called
- // on the same txStore more than once
- fateId = store.create();
- txStore = store.reserve(fateId);
- }, () -> txStore.delete(),
- EnumSet.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL,
TStatus.FAILED));
- }
- }
-
- /**
- * Inject a status into the status col of the fate store table for a given
transaction id.
- */
- private void injectStatus(ClientContext client, String table, FateId fateId,
TStatus status)
- throws TableNotFoundException {
- try (BatchWriter writer = client.createBatchWriter(table)) {
- Mutation mutation = new Mutation(new Text(fateId.getTxUUIDStr()));
- FateSchema.TxColumnFamily.STATUS_COLUMN.put(mutation, new
Value(status.name()));
- writer.addMutation(mutation);
- } catch (MutationsRejectedException e) {
- throw new RuntimeException(e);
- }
- }
-
- // Create the fate table with the exact configuration as the real Fate user
instance table
- // including table properties and TabletAvailability
- public static void createFateTable(ClientContext client, String table)
throws Exception {
- final var fateTableProps =
-
client.tableOperations().getTableProperties(AccumuloTable.FATE.tableName());
-
- TabletAvailability availability;
- try (var tabletStream = client.tableOperations()
- .getTabletInformation(AccumuloTable.FATE.tableName(), new Range())) {
- availability =
tabletStream.map(TabletInformation::getTabletAvailability).distinct()
- .collect(MoreCollectors.onlyElement());
- }
-
- var newTableConf = new
NewTableConfiguration().withInitialTabletAvailability(availability)
- .withoutDefaultIterators().setProperties(fateTableProps);
- client.tableOperations().create(table, newTableConf);
- var testFateTableProps =
client.tableOperations().getTableProperties(table);
-
- // ensure that create did not set any other props
- assertEquals(fateTableProps, testFateTableProps);
- }
-}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserMultipleStoresIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserMultipleStoresIT.java
index f3569f07aa..507a4b1b86 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserMultipleStoresIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserMultipleStoresIT.java
@@ -18,7 +18,7 @@
*/
package org.apache.accumulo.test.fate.user;
-import static
org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable;
+import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable;
import java.util.function.Predicate;