This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e301e8bf4 Improvements to reservation column in FATE table and bug 
fix (#4992)
1e301e8bf4 is described below

commit 1e301e8bf490152ce10e15a42d067f2f5b2c67ea
Author: Kevin Rathbun <krath...@apache.org>
AuthorDate: Mon Oct 21 10:12:41 2024 -0400

    Improvements to reservation column in FATE table and bug fix (#4992)
    
    * Makes it so the reservation column is created on reservation and deleted 
on unreservation (no longer store an unreserved value in the column)
    * Addresses a bug with MultipleStoresIT.testDeadReservationsCleanup() 
(ZooUtil.LockID was missing equals() and hashCode())
    
    closes #4907
---
 .../org/apache/accumulo/core/fate/FateStore.java   | 18 -------------
 .../accumulo/core/fate/user/FateMutator.java       | 20 +-------------
 .../accumulo/core/fate/user/FateMutatorImpl.java   | 23 +---------------
 .../accumulo/core/fate/user/UserFateStore.java     | 24 +++++++----------
 .../accumulo/core/fate/zookeeper/ZooUtil.java      | 19 +++++++++++++
 .../accumulo/test/fate/MultipleStoresIT.java       | 10 +++----
 .../accumulo/test/fate/user/FateMutatorImplIT.java | 31 +---------------------
 7 files changed, 36 insertions(+), 109 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index 6b7b68baf5..ae193d8df8 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@ -29,7 +29,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.UUID;
 
-import org.apache.accumulo.core.fate.user.FateMutatorImpl;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.hadoop.io.DataInputBuffer;
 
@@ -147,19 +146,6 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> 
{
       return new FateReservation(lockID, reservationUUID);
     }
 
-    /**
-     * @param serializedFateRes the value present in the table for the 
reservation column
-     * @return true if the array represents a valid serialized FateReservation 
object, false if it
-     *         represents an unreserved value, error otherwise
-     */
-    public static boolean isFateReservation(byte[] serializedFateRes) {
-      if (Arrays.equals(serializedFateRes, FateMutatorImpl.NOT_RESERVED)) {
-        return false;
-      }
-      deserialize(serializedFateRes);
-      return true;
-    }
-
     public ZooUtil.LockID getLockID() {
       return lockID;
     }
@@ -195,10 +181,6 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> 
{
       }
     }
 
-    public static boolean locksAreEqual(ZooUtil.LockID lockID1, ZooUtil.LockID 
lockID2) {
-      return lockID1.serialize("/").equals(lockID2.serialize("/"));
-    }
-
     @Override
     public String toString() {
       return lockID.serialize("/") + ":" + reservationUUID;
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
index 8c39e89700..d199a7463e 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
@@ -44,31 +44,13 @@ public interface FateMutator<T> {
 
   /**
    * Add a conditional mutation to {@link 
FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
-   * put the reservation if the column doesn't exist yet. This should only be 
used for
-   * {@link UserFateStore#createAndReserve(FateKey)}
-   *
-   * @param reservation the reservation to attempt to put
-   * @return the FateMutator with this added mutation
-   */
-  FateMutator<T> putReservedTxOnCreation(FateStore.FateReservation 
reservation);
-
-  /**
-   * Add a conditional mutation to {@link 
FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
-   * remove the given reservation if it matches what is present in the column.
+   * delete the column if the column value matches the given reservation
    *
    * @param reservation the reservation to attempt to remove
    * @return the FateMutator with this added mutation
    */
   FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation);
 
-  /**
-   * Add a conditional mutation to {@link 
FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
-   * put the initial column value if it has not already been set yet
-   *
-   * @return the FateMutator with this added mutation
-   */
-  FateMutator<T> putInitReservationVal();
-
   FateMutator<T> putName(byte[] data);
 
   FateMutator<T> putAutoClean(byte[] data);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
index fcb0e4f1f1..5d99a8df3a 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.accumulo.core.fate.user;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.accumulo.core.fate.AbstractFateStore.serialize;
 import static org.apache.accumulo.core.fate.user.UserFateStore.getRow;
 import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId;
@@ -51,8 +50,6 @@ import org.apache.hadoop.io.Text;
 
 public class FateMutatorImpl<T> implements FateMutator<T> {
 
-  public static final byte[] NOT_RESERVED = "".getBytes(UTF_8);
-
   private final ClientContext context;
   private final String tableName;
   private final FateId fateId;
@@ -85,15 +82,6 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
 
   @Override
   public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
-    Condition condition = new 
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
-        
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED);
-    mutation.addCondition(condition);
-    TxColumnFamily.RESERVATION_COLUMN.put(mutation, new 
Value(reservation.getSerialized()));
-    return this;
-  }
-
-  @Override
-  public FateMutator<T> putReservedTxOnCreation(FateStore.FateReservation 
reservation) {
     Condition condition = new 
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
         TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
     mutation.addCondition(condition);
@@ -107,16 +95,7 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
         TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier())
         .setValue(reservation.getSerialized());
     mutation.addCondition(condition);
-    TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED));
-    return this;
-  }
-
-  @Override
-  public FateMutator<T> putInitReservationVal() {
-    Condition condition = new 
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
-        TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
-    mutation.addCondition(condition);
-    TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED));
+    TxColumnFamily.RESERVATION_COLUMN.putDelete(mutation);
     return this;
   }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index e1cb4d6405..7446d1fafe 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -107,7 +107,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
       }
 
       var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW)
-          
.putCreateTime(System.currentTimeMillis()).putInitReservationVal().tryMutate();
+          .putCreateTime(System.currentTimeMillis()).tryMutate();
 
       switch (status) {
         case ACCEPTED:
@@ -137,8 +137,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
     // Only need to retry if it is UNKNOWN
     for (int attempt = 0; attempt < maxAttempts; attempt++) {
       status = 
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
-          
.putReservedTxOnCreation(reservation).putCreateTime(System.currentTimeMillis())
-          .tryMutate();
+          
.putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate();
       if (status != FateMutator.Status.UNKNOWN) {
         break;
       }
@@ -182,9 +181,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
                 fateKeySeen = Optional.of(FateKey.deserialize(val.get()));
                 break;
               case TxColumnFamily.RESERVATION:
-                if (FateReservation.isFateReservation(val.get())) {
-                  reservationSeen = 
Optional.of(FateReservation.deserialize(val.get()));
-                }
+                reservationSeen = 
Optional.of(FateReservation.deserialize(val.get()));
                 break;
               default:
                 throw new IllegalStateException("Unexpected column seen: " + 
colf + ":" + colq);
@@ -231,7 +228,9 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
     // Create a unique FateReservation for this reservation attempt
     FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
 
-    FateMutator.Status status = 
newMutator(fateId).putReservedTx(reservation).tryMutate();
+    // requiring any status prevents creating an entry if the fate id doesn't 
exist
+    FateMutator.Status status =
+        
newMutator(fateId).requireStatus(TStatus.values()).putReservedTx(reservation).tryMutate();
     if (status.equals(FateMutator.Status.ACCEPTED)) {
       return Optional.of(new FateTxStoreImpl(fateId, reservation));
     } else if (status.equals(FateMutator.Status.UNKNOWN)) {
@@ -246,10 +245,9 @@ public class UserFateStore<T> extends AbstractFateStore<T> 
{
         scanner.setRange(getRow(fateId));
         
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
             TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
-        FateReservation persistedRes = scanner.stream()
-            .filter(entry -> 
FateReservation.isFateReservation(entry.getValue().get()))
-            .map(entry -> 
FateReservation.deserialize(entry.getValue().get())).findFirst()
-            .orElse(null);
+        FateReservation persistedRes =
+            scanner.stream().map(entry -> 
FateReservation.deserialize(entry.getValue().get()))
+                .findFirst().orElse(null);
         if (persistedRes != null && persistedRes.equals(reservation)) {
           return Optional.of(new FateTxStoreImpl(fateId, reservation));
         }
@@ -318,9 +316,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
               status = TStatus.valueOf(val.toString());
               break;
             case TxColumnFamily.RESERVATION:
-              if (FateReservation.isFateReservation(val.get())) {
-                reservation = FateReservation.deserialize(val.get());
-              }
+              reservation = FateReservation.deserialize(val.get());
               break;
             default:
               throw new IllegalStateException("Unexpected column seen: " + 
colf + ":" + colq);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java
index 51002ee574..263f17e440 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java
@@ -28,6 +28,7 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.data.InstanceId;
@@ -92,6 +93,24 @@ public class ZooUtil {
     public String toString() {
       return " path = " + path + " node = " + node + " eid = " + 
Long.toHexString(eid);
     }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+      if (obj instanceof LockID) {
+        LockID other = (LockID) obj;
+        return this.path.equals(other.path) && this.node.equals(other.node)
+            && this.eid == other.eid;
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(path, node, eid);
+    }
   }
 
   // Need to use Collections.unmodifiableList() instead of List.of() or 
List.copyOf(), because
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
index f5e537394d..edd6a53859 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
@@ -421,8 +421,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
     // Verify store1 has the transactions reserved and that they were reserved 
with lock1
     reservations = store1.getActiveReservations();
     assertEquals(allIds, reservations.keySet());
-    reservations.values().forEach(
-        res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, 
res.getLockID())));
+    reservations.values().forEach(res -> assertEquals(lock1, res.getLockID()));
 
     if (isUserStore) {
       store2 = new UserFateStore<>(client, tableName, lock2, isLockHeld);
@@ -434,8 +433,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
     // store1
     reservations = store2.getActiveReservations();
     assertEquals(allIds, reservations.keySet());
-    reservations.values().forEach(
-        res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, 
res.getLockID())));
+    reservations.values().forEach(res -> assertEquals(lock1, res.getLockID()));
 
     // Simulate what would happen if the Manager using the Fate object (fate1) 
died.
     // isLockHeld would return false for the LockId of the Manager that died 
(in this case, lock1)
@@ -455,8 +453,8 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
     // the workers for fate1 are hung up
     Wait.waitFor(() -> {
       Map<FateId,FateStore.FateReservation> store2Reservations = 
store2.getActiveReservations();
-      boolean allReservedWithLock2 = store2Reservations.values().stream()
-          .allMatch(entry -> 
FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2));
+      boolean allReservedWithLock2 =
+          store2Reservations.values().stream().allMatch(entry -> 
entry.getLockID().equals(lock2));
       return store2Reservations.keySet().equals(allIds) && 
allReservedWithLock2;
     }, fate1.getDeadResCleanupDelay().toMillis() * 2);
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java
index fe16e2b014..1d80a5fb9b 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java
@@ -181,29 +181,14 @@ public class FateMutatorImplIT extends 
SharedMiniClusterBase {
       ClientContext context = (ClientContext) client;
 
       FateId fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
-      FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
       ZooUtil.LockID lockID = new ZooUtil.LockID("/locks", "L1", 50);
       FateStore.FateReservation reservation =
           FateStore.FateReservation.from(lockID, UUID.randomUUID());
       FateStore.FateReservation wrongReservation =
           FateStore.FateReservation.from(lockID, UUID.randomUUID());
 
-      // Ensure that we cannot do anything in the column until it is 
initialized
-      FateMutator.Status status =
-          new FateMutatorImpl<>(context, table, 
fateId).putReservedTx(reservation).tryMutate();
-      assertEquals(REJECTED, status);
-      status =
-          new FateMutatorImpl<>(context, table, 
fateId).putUnreserveTx(reservation).tryMutate();
-      assertEquals(REJECTED, status);
-
-      // Initialize the column and ensure we can't do it twice
-      status = new FateMutatorImpl<>(context, table, 
fateId).putInitReservationVal().tryMutate();
-      assertEquals(ACCEPTED, status);
-      status = new FateMutatorImpl<>(context, table, 
fateId).putInitReservationVal().tryMutate();
-      assertEquals(REJECTED, status);
-
       // Ensure that reserving is the only thing we can do
-      status =
+      FateMutator.Status status =
           new FateMutatorImpl<>(context, table, 
fateId).putUnreserveTx(reservation).tryMutate();
       assertEquals(REJECTED, status);
       status = new FateMutatorImpl<>(context, table, 
fateId).putReservedTx(reservation).tryMutate();
@@ -226,20 +211,6 @@ public class FateMutatorImplIT extends 
SharedMiniClusterBase {
       status =
           new FateMutatorImpl<>(context, table, 
fateId).putUnreserveTx(reservation).tryMutate();
       assertEquals(REJECTED, status);
-
-      // Verify putReservedTxOnCreation works as expected
-      status = new FateMutatorImpl<>(context, table, 
fateId1).putReservedTxOnCreation(reservation)
-          .tryMutate();
-      assertEquals(ACCEPTED, status);
-      status = new FateMutatorImpl<>(context, table, 
fateId1).putReservedTxOnCreation(reservation)
-          .tryMutate();
-      assertEquals(REJECTED, status);
-      status =
-          new FateMutatorImpl<>(context, table, 
fateId1).putUnreserveTx(reservation).tryMutate();
-      assertEquals(ACCEPTED, status);
-      status = new FateMutatorImpl<>(context, table, 
fateId1).putReservedTxOnCreation(reservation)
-          .tryMutate();
-      assertEquals(REJECTED, status);
     }
   }
 

Reply via email to