This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 1e301e8bf4 Improvements to reservation column in FATE table and bug fix (#4992) 1e301e8bf4 is described below commit 1e301e8bf490152ce10e15a42d067f2f5b2c67ea Author: Kevin Rathbun <krath...@apache.org> AuthorDate: Mon Oct 21 10:12:41 2024 -0400 Improvements to reservation column in FATE table and bug fix (#4992) * Makes it so the reservation column is created on reservation and deleted on unreservation (no longer store an unreserved value in the column) * Addresses a bug with MultipleStoresIT.testDeadReservationsCleanup() (ZooUtil.LockID was missing equals() and hashCode()) closes #4907 --- .../org/apache/accumulo/core/fate/FateStore.java | 18 ------------- .../accumulo/core/fate/user/FateMutator.java | 20 +------------- .../accumulo/core/fate/user/FateMutatorImpl.java | 23 +--------------- .../accumulo/core/fate/user/UserFateStore.java | 24 +++++++---------- .../accumulo/core/fate/zookeeper/ZooUtil.java | 19 +++++++++++++ .../accumulo/test/fate/MultipleStoresIT.java | 10 +++---- .../accumulo/test/fate/user/FateMutatorImplIT.java | 31 +--------------------- 7 files changed, 36 insertions(+), 109 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 6b7b68baf5..ae193d8df8 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -29,7 +29,6 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; -import org.apache.accumulo.core.fate.user.FateMutatorImpl; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.hadoop.io.DataInputBuffer; @@ -147,19 +146,6 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { return new FateReservation(lockID, reservationUUID); } - /** - * @param serializedFateRes the value present in the table for the reservation column - * @return true if the array represents a valid serialized FateReservation object, false if it - * represents an unreserved value, error otherwise - */ - public static boolean isFateReservation(byte[] serializedFateRes) { - if (Arrays.equals(serializedFateRes, FateMutatorImpl.NOT_RESERVED)) { - return false; - } - deserialize(serializedFateRes); - return true; - } - public ZooUtil.LockID getLockID() { return lockID; } @@ -195,10 +181,6 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { } } - public static boolean locksAreEqual(ZooUtil.LockID lockID1, ZooUtil.LockID lockID2) { - return lockID1.serialize("/").equals(lockID2.serialize("/")); - } - @Override public String toString() { return lockID.serialize("/") + ":" + reservationUUID; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java index 8c39e89700..d199a7463e 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java @@ -44,31 +44,13 @@ public interface FateMutator<T> { /** * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will - * put the reservation if the column doesn't exist yet. This should only be used for - * {@link UserFateStore#createAndReserve(FateKey)} - * - * @param reservation the reservation to attempt to put - * @return the FateMutator with this added mutation - */ - FateMutator<T> putReservedTxOnCreation(FateStore.FateReservation reservation); - - /** - * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will - * remove the given reservation if it matches what is present in the column. + * delete the column if the column value matches the given reservation * * @param reservation the reservation to attempt to remove * @return the FateMutator with this added mutation */ FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation); - /** - * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will - * put the initial column value if it has not already been set yet - * - * @return the FateMutator with this added mutation - */ - FateMutator<T> putInitReservationVal(); - FateMutator<T> putName(byte[] data); FateMutator<T> putAutoClean(byte[] data); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java index fcb0e4f1f1..5d99a8df3a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.core.fate.user; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.fate.AbstractFateStore.serialize; import static org.apache.accumulo.core.fate.user.UserFateStore.getRow; import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId; @@ -51,8 +50,6 @@ import org.apache.hadoop.io.Text; public class FateMutatorImpl<T> implements FateMutator<T> { - public static final byte[] NOT_RESERVED = "".getBytes(UTF_8); - private final ClientContext context; private final String tableName; private final FateId fateId; @@ -85,15 +82,6 @@ public class FateMutatorImpl<T> implements FateMutator<T> { @Override public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) { - Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED); - mutation.addCondition(condition); - TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized())); - return this; - } - - @Override - public FateMutator<T> putReservedTxOnCreation(FateStore.FateReservation reservation) { Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); mutation.addCondition(condition); @@ -107,16 +95,7 @@ public class FateMutatorImpl<T> implements FateMutator<T> { TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()) .setValue(reservation.getSerialized()); mutation.addCondition(condition); - TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED)); - return this; - } - - @Override - public FateMutator<T> putInitReservationVal() { - Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); - mutation.addCondition(condition); - TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED)); + TxColumnFamily.RESERVATION_COLUMN.putDelete(mutation); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index e1cb4d6405..7446d1fafe 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -107,7 +107,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { } var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW) - .putCreateTime(System.currentTimeMillis()).putInitReservationVal().tryMutate(); + .putCreateTime(System.currentTimeMillis()).tryMutate(); switch (status) { case ACCEPTED: @@ -137,8 +137,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { // Only need to retry if it is UNKNOWN for (int attempt = 0; attempt < maxAttempts; attempt++) { status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey) - .putReservedTxOnCreation(reservation).putCreateTime(System.currentTimeMillis()) - .tryMutate(); + .putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate(); if (status != FateMutator.Status.UNKNOWN) { break; } @@ -182,9 +181,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { fateKeySeen = Optional.of(FateKey.deserialize(val.get())); break; case TxColumnFamily.RESERVATION: - if (FateReservation.isFateReservation(val.get())) { - reservationSeen = Optional.of(FateReservation.deserialize(val.get())); - } + reservationSeen = Optional.of(FateReservation.deserialize(val.get())); break; default: throw new IllegalStateException("Unexpected column seen: " + colf + ":" + colq); @@ -231,7 +228,9 @@ public class UserFateStore<T> extends AbstractFateStore<T> { // Create a unique FateReservation for this reservation attempt FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); - FateMutator.Status status = newMutator(fateId).putReservedTx(reservation).tryMutate(); + // requiring any status prevents creating an entry if the fate id doesn't exist + FateMutator.Status status = + newMutator(fateId).requireStatus(TStatus.values()).putReservedTx(reservation).tryMutate(); if (status.equals(FateMutator.Status.ACCEPTED)) { return Optional.of(new FateTxStoreImpl(fateId, reservation)); } else if (status.equals(FateMutator.Status.UNKNOWN)) { @@ -246,10 +245,9 @@ public class UserFateStore<T> extends AbstractFateStore<T> { scanner.setRange(getRow(fateId)); scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); - FateReservation persistedRes = scanner.stream() - .filter(entry -> FateReservation.isFateReservation(entry.getValue().get())) - .map(entry -> FateReservation.deserialize(entry.getValue().get())).findFirst() - .orElse(null); + FateReservation persistedRes = + scanner.stream().map(entry -> FateReservation.deserialize(entry.getValue().get())) + .findFirst().orElse(null); if (persistedRes != null && persistedRes.equals(reservation)) { return Optional.of(new FateTxStoreImpl(fateId, reservation)); } @@ -318,9 +316,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { status = TStatus.valueOf(val.toString()); break; case TxColumnFamily.RESERVATION: - if (FateReservation.isFateReservation(val.get())) { - reservation = FateReservation.deserialize(val.get()); - } + reservation = FateReservation.deserialize(val.get()); break; default: throw new IllegalStateException("Unexpected column seen: " + colf + ":" + colq); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java index 51002ee574..263f17e440 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java @@ -28,6 +28,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.InstanceId; @@ -92,6 +93,24 @@ public class ZooUtil { public String toString() { return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid); } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof LockID) { + LockID other = (LockID) obj; + return this.path.equals(other.path) && this.node.equals(other.node) + && this.eid == other.eid; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(path, node, eid); + } } // Need to use Collections.unmodifiableList() instead of List.of() or List.copyOf(), because diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index f5e537394d..edd6a53859 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -421,8 +421,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase { // Verify store1 has the transactions reserved and that they were reserved with lock1 reservations = store1.getActiveReservations(); assertEquals(allIds, reservations.keySet()); - reservations.values().forEach( - res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, res.getLockID()))); + reservations.values().forEach(res -> assertEquals(lock1, res.getLockID())); if (isUserStore) { store2 = new UserFateStore<>(client, tableName, lock2, isLockHeld); @@ -434,8 +433,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase { // store1 reservations = store2.getActiveReservations(); assertEquals(allIds, reservations.keySet()); - reservations.values().forEach( - res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, res.getLockID()))); + reservations.values().forEach(res -> assertEquals(lock1, res.getLockID())); // Simulate what would happen if the Manager using the Fate object (fate1) died. // isLockHeld would return false for the LockId of the Manager that died (in this case, lock1) @@ -455,8 +453,8 @@ public class MultipleStoresIT extends SharedMiniClusterBase { // the workers for fate1 are hung up Wait.waitFor(() -> { Map<FateId,FateStore.FateReservation> store2Reservations = store2.getActiveReservations(); - boolean allReservedWithLock2 = store2Reservations.values().stream() - .allMatch(entry -> FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2)); + boolean allReservedWithLock2 = + store2Reservations.values().stream().allMatch(entry -> entry.getLockID().equals(lock2)); return store2Reservations.keySet().equals(allIds) && allReservedWithLock2; }, fate1.getDeadResCleanupDelay().toMillis() * 2); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java index fe16e2b014..1d80a5fb9b 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java @@ -181,29 +181,14 @@ public class FateMutatorImplIT extends SharedMiniClusterBase { ClientContext context = (ClientContext) client; FateId fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); - FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); ZooUtil.LockID lockID = new ZooUtil.LockID("/locks", "L1", 50); FateStore.FateReservation reservation = FateStore.FateReservation.from(lockID, UUID.randomUUID()); FateStore.FateReservation wrongReservation = FateStore.FateReservation.from(lockID, UUID.randomUUID()); - // Ensure that we cannot do anything in the column until it is initialized - FateMutator.Status status = - new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); - assertEquals(REJECTED, status); - status = - new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); - assertEquals(REJECTED, status); - - // Initialize the column and ensure we can't do it twice - status = new FateMutatorImpl<>(context, table, fateId).putInitReservationVal().tryMutate(); - assertEquals(ACCEPTED, status); - status = new FateMutatorImpl<>(context, table, fateId).putInitReservationVal().tryMutate(); - assertEquals(REJECTED, status); - // Ensure that reserving is the only thing we can do - status = + FateMutator.Status status = new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); assertEquals(REJECTED, status); status = new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); @@ -226,20 +211,6 @@ public class FateMutatorImplIT extends SharedMiniClusterBase { status = new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); assertEquals(REJECTED, status); - - // Verify putReservedTxOnCreation works as expected - status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation) - .tryMutate(); - assertEquals(ACCEPTED, status); - status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation) - .tryMutate(); - assertEquals(REJECTED, status); - status = - new FateMutatorImpl<>(context, table, fateId1).putUnreserveTx(reservation).tryMutate(); - assertEquals(ACCEPTED, status); - status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation) - .tryMutate(); - assertEquals(REJECTED, status); } }