This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new d90f85b8cb updates FateCleaner to use SteadyTime (#4670)
d90f85b8cb is described below

commit d90f85b8cb86c9ca72064b8bcf1158ea9923dff5
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Jun 14 09:36:38 2024 -0400

    updates FateCleaner to use SteadyTime (#4670)
    
    fixes #4481
---
 .../org/apache/accumulo/core/fate/FateCleaner.java | 49 ++++++++++------------
 .../apache/accumulo/core/fate/FateCleanerTest.java | 48 +++++++++++++++------
 .../java/org/apache/accumulo/manager/Manager.java  |  2 +-
 3 files changed, 59 insertions(+), 40 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java
index e1738c2167..20e6ef691d 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java
@@ -20,10 +20,12 @@ package org.apache.accumulo.core.fate;
 
 import java.time.Duration;
 import java.util.EnumSet;
-import java.util.UUID;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.fate.FateStore.FateTxStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,54 +38,45 @@ import com.google.common.base.Preconditions;
  * field is used to track fate transactions that are candidates for cleanup.
  *
  * <p>
- * No external time source is used. It starts tracking idle time when its 
created.
- *
- * <p>
  * The {@link #ageOff()} method on this class must be periodically called 
inorder to cleanup to
  * happen.
  */
 public class FateCleaner<T> {
 
   public interface TimeSource {
-    long currentTimeNanos();
+    SteadyTime steadyTime();
   }
 
   // Statuses that can be aged off if idle for a prolonged period.
   private static final EnumSet<TStatus> AGE_OFF_STATUSES =
       EnumSet.of(TStatus.NEW, TStatus.FAILED, TStatus.SUCCESSFUL);
 
-  // This is used to determine if age off data was persisted by another 
instance of this object.
-  private final UUID instanceId = UUID.randomUUID();
-
   private static final Logger log = LoggerFactory.getLogger(FateCleaner.class);
 
   private final FateStore<T> store;
 
-  private final long ageOffTime;
+  private final Duration ageOffTime;
   private final TimeSource timeSource;
 
   private static class AgeOffInfo {
-    final UUID instanceId;
-    final long setTime;
+    final SteadyTime setTime;
     final TStatus status;
 
     public AgeOffInfo(String ageOffStr) {
       var tokens = ageOffStr.split(":");
-      Preconditions.checkArgument(tokens.length == 3, "Malformed input %s", 
ageOffStr);
-      instanceId = UUID.fromString(tokens[0]);
-      setTime = Long.parseLong(tokens[1]);
-      status = TStatus.valueOf(tokens[2]);
+      Preconditions.checkArgument(tokens.length == 2, "Malformed input %s", 
ageOffStr);
+      setTime = SteadyTime.from(Long.parseLong(tokens[0]), 
TimeUnit.NANOSECONDS);
+      status = TStatus.valueOf(tokens[1]);
     }
 
-    public AgeOffInfo(UUID instanceId, long time, TStatus status) {
-      this.instanceId = instanceId;
+    public AgeOffInfo(SteadyTime time, TStatus status) {
       this.setTime = time;
       this.status = status;
     }
 
     @Override
     public String toString() {
-      return instanceId + ":" + setTime + ":" + status;
+      return setTime.getNanos() + ":" + status;
     }
   }
 
@@ -97,9 +90,12 @@ public class FateCleaner<T> {
   }
 
   private boolean shouldAgeOff(TStatus currStatus, AgeOffInfo ageOffInfo) {
+    SteadyTime currSteadyTime = timeSource.steadyTime();
+    Duration elapsed = currSteadyTime.minus(ageOffInfo.setTime);
+    Preconditions.checkState(!elapsed.isNegative(), "Elapsed steady time is 
negative : %s %s %s",
+        currSteadyTime, ageOffInfo.setTime, elapsed);
     return AGE_OFF_STATUSES.contains(currStatus) && currStatus == 
ageOffInfo.status
-        && ageOffInfo.instanceId.equals(instanceId)
-        && timeSource.currentTimeNanos() - ageOffInfo.setTime >= ageOffTime;
+        && elapsed.compareTo(ageOffTime) > 0;
   }
 
   public void ageOff() {
@@ -108,12 +104,10 @@ public class FateCleaner<T> {
           try {
             AgeOffInfo ageOffInfo = readAgeOffInfo(txStore);
             TStatus currStatus = txStore.getStatus();
-            if (ageOffInfo == null || !ageOffInfo.instanceId.equals(instanceId)
-                || currStatus != ageOffInfo.status) {
+            if (ageOffInfo == null || currStatus != ageOffInfo.status) {
               // set or reset the age off info because it does not exists or 
it exists but is no
               // longer valid
-              var newAgeOffInfo =
-                  new AgeOffInfo(instanceId, timeSource.currentTimeNanos(), 
currStatus);
+              var newAgeOffInfo = new AgeOffInfo(timeSource.steadyTime(), 
currStatus);
               txStore.setTransactionInfo(Fate.TxInfo.TX_AGEOFF, 
newAgeOffInfo.toString());
               log.trace("Set age off data {} {}", idStatus.getFateId(), 
newAgeOffInfo);
             } else if (shouldAgeOff(currStatus, ageOffInfo)) {
@@ -127,8 +121,9 @@ public class FateCleaner<T> {
   }
 
   public FateCleaner(FateStore<T> store, Duration duration, TimeSource 
timeSource) {
-    this.store = store;
-    this.ageOffTime = duration.toNanos();
-    this.timeSource = timeSource;
+    this.store = Objects.requireNonNull(store);
+    this.ageOffTime = Objects.requireNonNull(duration);
+    this.timeSource = Objects.requireNonNull(timeSource);
+    Preconditions.checkArgument(!duration.isNegative() && !duration.isZero());
   }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java 
b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java
index eb0d1dc748..8a6a69c70d 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java
@@ -20,13 +20,16 @@ package org.apache.accumulo.core.fate;
 
 import static java.util.stream.Collectors.toSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.time.Duration;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.fate.FateCleaner.TimeSource;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.zookeeper.KeeperException;
 import org.junit.jupiter.api.Test;
 
@@ -36,10 +39,9 @@ public class FateCleanerTest {
     long time = 0;
 
     @Override
-    public long currentTimeNanos() {
-      return time;
+    public SteadyTime steadyTime() {
+      return SteadyTime.from(time, TimeUnit.NANOSECONDS);
     }
-
   }
 
   @Test
@@ -225,7 +227,8 @@ public class FateCleanerTest {
 
   @Test
   public void testNewCleaner() {
-    // this test ensures that a new cleaner instance ignores data from another 
cleaner instance
+    // this test ensures that a new cleaner instance uses persisted data from 
a previous cleaner
+    // instance
 
     TestTimeSource tts = new TestTimeSource();
     TestStore testStore = new TestStore();
@@ -250,24 +253,45 @@ public class FateCleanerTest {
     assertEquals(Set.of(fateId2, fateId3),
         testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 
-    // create a new cleaner, it should ignore any data stored by previous 
cleaner
+    // create a new cleaner, it should use the steady times persisted by 
previous cleaner instance
     FateCleaner<String> cleaner2 = new FateCleaner<>(testStore, 
Duration.ofHours(10), tts);
 
     tts.time += Duration.ofHours(5).toNanos();
-    // since this is a new cleaner instance, it should reset the clock
+
     cleaner2.ageOff();
-    assertEquals(Set.of(fateId2, fateId3),
-        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
+    assertEquals(Set.of(fateId3), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 
-    // since the clock was reset, advancing time should not age anything off
-    tts.time += Duration.ofHours(9).toNanos();
+    tts.time += Duration.ofHours(4).toNanos();
     cleaner2.ageOff();
-    assertEquals(Set.of(fateId2, fateId3),
-        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
+    assertEquals(Set.of(fateId3), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 
     // this should advance time enough to age everything off
     tts.time += Duration.ofHours(2).toNanos();
     cleaner2.ageOff();
     assertEquals(Set.of(), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
   }
+
+  @Test
+  public void testErrors() {
+    TestTimeSource tts = new TestTimeSource();
+    TestStore testStore = new TestStore();
+    assertThrows(IllegalArgumentException.class,
+        () -> new FateCleaner<>(testStore, Duration.ofHours(-10), tts));
+    assertThrows(IllegalArgumentException.class,
+        () -> new FateCleaner<>(testStore, Duration.ZERO, tts));
+    assertThrows(NullPointerException.class,
+        () -> new FateCleaner<>(null, Duration.ofHours(10), tts));
+    assertThrows(NullPointerException.class, () -> new 
FateCleaner<>(testStore, null, tts));
+    assertThrows(NullPointerException.class,
+        () -> new FateCleaner<>(testStore, Duration.ofHours(10), null));
+
+    tts.time += Duration.ofHours(6).toNanos();
+
+    FateCleaner<String> cleaner1 = new FateCleaner<>(testStore, 
Duration.ofHours(10), tts);
+    FateId fateId1 = testStore.create();
+    cleaner1.ageOff();
+    tts.time -= Duration.ofHours(3).toNanos();
+    // steady time going backwards should cause an error
+    assertThrows(IllegalStateException.class, () -> cleaner1.ageOff());
+  }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index e5deae2b26..c7e6339a22 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1262,7 +1262,7 @@ public class Manager extends AbstractServer
     final Fate<Manager> fateInstance =
         new Fate<>(this, store, TraceRepo::toLogString, getConfiguration());
 
-    var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), 
System::nanoTime);
+    var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), 
this::getSteadyTime);
     ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
         .scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES));
 

Reply via email to