This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new d90f85b8cb updates FateCleaner to use SteadyTime (#4670) d90f85b8cb is described below commit d90f85b8cb86c9ca72064b8bcf1158ea9923dff5 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Jun 14 09:36:38 2024 -0400 updates FateCleaner to use SteadyTime (#4670) fixes #4481 --- .../org/apache/accumulo/core/fate/FateCleaner.java | 49 ++++++++++------------ .../apache/accumulo/core/fate/FateCleanerTest.java | 48 +++++++++++++++------ .../java/org/apache/accumulo/manager/Manager.java | 2 +- 3 files changed, 59 insertions(+), 40 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java index e1738c2167..20e6ef691d 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java @@ -20,10 +20,12 @@ package org.apache.accumulo.core.fate; import java.time.Duration; import java.util.EnumSet; -import java.util.UUID; +import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.util.time.SteadyTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,54 +38,45 @@ import com.google.common.base.Preconditions; * field is used to track fate transactions that are candidates for cleanup. * * <p> - * No external time source is used. It starts tracking idle time when its created. - * - * <p> * The {@link #ageOff()} method on this class must be periodically called inorder to cleanup to * happen. */ public class FateCleaner<T> { public interface TimeSource { - long currentTimeNanos(); + SteadyTime steadyTime(); } // Statuses that can be aged off if idle for a prolonged period. private static final EnumSet<TStatus> AGE_OFF_STATUSES = EnumSet.of(TStatus.NEW, TStatus.FAILED, TStatus.SUCCESSFUL); - // This is used to determine if age off data was persisted by another instance of this object. - private final UUID instanceId = UUID.randomUUID(); - private static final Logger log = LoggerFactory.getLogger(FateCleaner.class); private final FateStore<T> store; - private final long ageOffTime; + private final Duration ageOffTime; private final TimeSource timeSource; private static class AgeOffInfo { - final UUID instanceId; - final long setTime; + final SteadyTime setTime; final TStatus status; public AgeOffInfo(String ageOffStr) { var tokens = ageOffStr.split(":"); - Preconditions.checkArgument(tokens.length == 3, "Malformed input %s", ageOffStr); - instanceId = UUID.fromString(tokens[0]); - setTime = Long.parseLong(tokens[1]); - status = TStatus.valueOf(tokens[2]); + Preconditions.checkArgument(tokens.length == 2, "Malformed input %s", ageOffStr); + setTime = SteadyTime.from(Long.parseLong(tokens[0]), TimeUnit.NANOSECONDS); + status = TStatus.valueOf(tokens[1]); } - public AgeOffInfo(UUID instanceId, long time, TStatus status) { - this.instanceId = instanceId; + public AgeOffInfo(SteadyTime time, TStatus status) { this.setTime = time; this.status = status; } @Override public String toString() { - return instanceId + ":" + setTime + ":" + status; + return setTime.getNanos() + ":" + status; } } @@ -97,9 +90,12 @@ public class FateCleaner<T> { } private boolean shouldAgeOff(TStatus currStatus, AgeOffInfo ageOffInfo) { + SteadyTime currSteadyTime = timeSource.steadyTime(); + Duration elapsed = currSteadyTime.minus(ageOffInfo.setTime); + Preconditions.checkState(!elapsed.isNegative(), "Elapsed steady time is negative : %s %s %s", + currSteadyTime, ageOffInfo.setTime, elapsed); return AGE_OFF_STATUSES.contains(currStatus) && currStatus == ageOffInfo.status - && ageOffInfo.instanceId.equals(instanceId) - && timeSource.currentTimeNanos() - ageOffInfo.setTime >= ageOffTime; + && elapsed.compareTo(ageOffTime) > 0; } public void ageOff() { @@ -108,12 +104,10 @@ public class FateCleaner<T> { try { AgeOffInfo ageOffInfo = readAgeOffInfo(txStore); TStatus currStatus = txStore.getStatus(); - if (ageOffInfo == null || !ageOffInfo.instanceId.equals(instanceId) - || currStatus != ageOffInfo.status) { + if (ageOffInfo == null || currStatus != ageOffInfo.status) { // set or reset the age off info because it does not exists or it exists but is no // longer valid - var newAgeOffInfo = - new AgeOffInfo(instanceId, timeSource.currentTimeNanos(), currStatus); + var newAgeOffInfo = new AgeOffInfo(timeSource.steadyTime(), currStatus); txStore.setTransactionInfo(Fate.TxInfo.TX_AGEOFF, newAgeOffInfo.toString()); log.trace("Set age off data {} {}", idStatus.getFateId(), newAgeOffInfo); } else if (shouldAgeOff(currStatus, ageOffInfo)) { @@ -127,8 +121,9 @@ public class FateCleaner<T> { } public FateCleaner(FateStore<T> store, Duration duration, TimeSource timeSource) { - this.store = store; - this.ageOffTime = duration.toNanos(); - this.timeSource = timeSource; + this.store = Objects.requireNonNull(store); + this.ageOffTime = Objects.requireNonNull(duration); + this.timeSource = Objects.requireNonNull(timeSource); + Preconditions.checkArgument(!duration.isNegative() && !duration.isZero()); } } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java index eb0d1dc748..8a6a69c70d 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java @@ -20,13 +20,16 @@ package org.apache.accumulo.core.fate; import static java.util.stream.Collectors.toSet; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.time.Duration; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.FateCleaner.TimeSource; import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.Test; @@ -36,10 +39,9 @@ public class FateCleanerTest { long time = 0; @Override - public long currentTimeNanos() { - return time; + public SteadyTime steadyTime() { + return SteadyTime.from(time, TimeUnit.NANOSECONDS); } - } @Test @@ -225,7 +227,8 @@ public class FateCleanerTest { @Test public void testNewCleaner() { - // this test ensures that a new cleaner instance ignores data from another cleaner instance + // this test ensures that a new cleaner instance uses persisted data from a previous cleaner + // instance TestTimeSource tts = new TestTimeSource(); TestStore testStore = new TestStore(); @@ -250,24 +253,45 @@ public class FateCleanerTest { assertEquals(Set.of(fateId2, fateId3), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); - // create a new cleaner, it should ignore any data stored by previous cleaner + // create a new cleaner, it should use the steady times persisted by previous cleaner instance FateCleaner<String> cleaner2 = new FateCleaner<>(testStore, Duration.ofHours(10), tts); tts.time += Duration.ofHours(5).toNanos(); - // since this is a new cleaner instance, it should reset the clock + cleaner2.ageOff(); - assertEquals(Set.of(fateId2, fateId3), - testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + assertEquals(Set.of(fateId3), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); - // since the clock was reset, advancing time should not age anything off - tts.time += Duration.ofHours(9).toNanos(); + tts.time += Duration.ofHours(4).toNanos(); cleaner2.ageOff(); - assertEquals(Set.of(fateId2, fateId3), - testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + assertEquals(Set.of(fateId3), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); // this should advance time enough to age everything off tts.time += Duration.ofHours(2).toNanos(); cleaner2.ageOff(); assertEquals(Set.of(), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); } + + @Test + public void testErrors() { + TestTimeSource tts = new TestTimeSource(); + TestStore testStore = new TestStore(); + assertThrows(IllegalArgumentException.class, + () -> new FateCleaner<>(testStore, Duration.ofHours(-10), tts)); + assertThrows(IllegalArgumentException.class, + () -> new FateCleaner<>(testStore, Duration.ZERO, tts)); + assertThrows(NullPointerException.class, + () -> new FateCleaner<>(null, Duration.ofHours(10), tts)); + assertThrows(NullPointerException.class, () -> new FateCleaner<>(testStore, null, tts)); + assertThrows(NullPointerException.class, + () -> new FateCleaner<>(testStore, Duration.ofHours(10), null)); + + tts.time += Duration.ofHours(6).toNanos(); + + FateCleaner<String> cleaner1 = new FateCleaner<>(testStore, Duration.ofHours(10), tts); + FateId fateId1 = testStore.create(); + cleaner1.ageOff(); + tts.time -= Duration.ofHours(3).toNanos(); + // steady time going backwards should cause an error + assertThrows(IllegalStateException.class, () -> cleaner1.ageOff()); + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index e5deae2b26..c7e6339a22 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1262,7 +1262,7 @@ public class Manager extends AbstractServer final Fate<Manager> fateInstance = new Fate<>(this, store, TraceRepo::toLogString, getConfiguration()); - var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), System::nanoTime); + var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), this::getSteadyTime); ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() .scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES));