This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 23a921726dc866c926ed389c06beaf4bd0512d8f Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Fri Aug 23 11:22:38 2024 -0400 Revert "Merge Timer/CountdownTimer/NanoTime changes into elasticity (#4821)" This reverts commit fa4b73f0f9b7d3707bf04c603ae79d5c2dccd55e. --- .../core/clientImpl/ClientTabletCache.java | 11 +- .../core/clientImpl/ClientTabletCacheImpl.java | 33 ++--- .../java/org/apache/accumulo/core/fate/Fate.java | 10 +- .../org/apache/accumulo/core/lock/ServiceLock.java | 9 +- .../java/org/apache/accumulo/core/util/Timer.java | 25 ++++ .../util/compaction/ExternalCompactionUtil.java | 7 +- .../apache/accumulo/core/util/time/NanoTime.java | 104 +++++++++++++ .../org/apache/accumulo/core/util/TimerTest.java | 34 +++++ .../accumulo/core/util/time/NanoTimeTest.java | 162 +++++++++++++++++++++ .../accumulo/server/compaction/FileCompactor.java | 14 +- .../org/apache/accumulo/compactor/Compactor.java | 6 +- .../apache/accumulo/gc/SimpleGarbageCollector.java | 9 +- .../java/org/apache/accumulo/manager/Manager.java | 6 +- .../availability/SetTabletAvailability.java | 8 +- .../manager/tableOps/merge/ReserveTablets.java | 7 +- .../org/apache/accumulo/tserver/ScanServer.java | 6 +- .../accumulo/tserver/UnloadTabletHandler.java | 6 +- 17 files changed, 385 insertions(+), 72 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java index 5a23cad2d4..a31ca2418b 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java @@ -44,8 +44,8 @@ import org.apache.accumulo.core.metadata.MetadataCachedTabletObtainer; import org.apache.accumulo.core.singletons.SingletonManager; import org.apache.accumulo.core.singletons.SingletonService; import org.apache.accumulo.core.util.Interner; -import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.hadoop.io.Text; import com.google.common.base.Preconditions; @@ -311,7 +311,7 @@ public abstract class ClientTabletCache { private final TabletAvailability availability; private final boolean hostingRequested; - private final Timer creationTimer = Timer.startNew(); + private final NanoTime creationTime = NanoTime.now(); public CachedTablet(KeyExtent tablet_extent, String tablet_location, String session, TabletAvailability availability, boolean hostingRequested) { @@ -392,11 +392,8 @@ public abstract class ClientTabletCache { return this.availability; } - /** - * @return a timer that was started when this object was created - */ - public Timer getCreationTimer() { - return creationTimer; + public NanoTime getCreationTime() { + return creationTime; } public boolean wasHostingRequested() { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java index 7aa10260cb..7f3de5819c 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java @@ -19,7 +19,6 @@ package org.apache.accumulo.core.clientImpl; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; import java.time.Duration; import java.util.ArrayList; @@ -61,6 +60,7 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparator; import org.slf4j.Logger; @@ -238,14 +238,14 @@ public class ClientTabletCacheImpl extends ClientTabletCache { // Want to ignore any entries in the cache w/o a location that were created before the // following time. Entries created after the following time may have been populated by the // following loop, and we want to use those. - Timer cacheCutoffTimer = Timer.startNew(); + var cacheCutoff = NanoTime.now(); for (T mutation : notInCache) { row.set(mutation.getRow()); CachedTablet tl = _findTablet(context, row, false, false, false, lcSession, - LocationNeed.REQUIRED, cacheCutoffTimer); + LocationNeed.REQUIRED, cacheCutoff); if (!addMutation(binnedMutations, mutation, tl, lcSession)) { failures.add(mutation); @@ -328,7 +328,7 @@ public class ClientTabletCacheImpl extends ClientTabletCache { // Use anything in the cache w/o a location populated after this point in time. Cache entries // w/o a location created before the following time should be ignored and the metadata table // consulted. - Timer cacheCutoffTimer = Timer.startNew(); + var cacheCutoff = NanoTime.now(); l1: for (Range range : ranges) { @@ -348,7 +348,7 @@ public class ClientTabletCacheImpl extends ClientTabletCache { tl = lcSession.checkLock(findTabletInCache(startRow)); } else { tl = _findTablet(context, startRow, false, false, false, lcSession, locationNeed, - cacheCutoffTimer); + cacheCutoff); } if (tl == null) { @@ -367,7 +367,7 @@ public class ClientTabletCacheImpl extends ClientTabletCache { tl = lcSession.checkLock(findTabletInCache(row)); } else { tl = _findTablet(context, tl.getExtent().endRow(), true, false, false, lcSession, - locationNeed, cacheCutoffTimer); + locationNeed, cacheCutoff); } if (tl == null) { @@ -561,7 +561,7 @@ public class ClientTabletCacheImpl extends ClientTabletCache { LockCheckerSession lcSession = new LockCheckerSession(); CachedTablet tl = - _findTablet(context, row, skipRow, false, true, lcSession, locationNeed, Timer.startNew()); + _findTablet(context, row, skipRow, false, true, lcSession, locationNeed, NanoTime.now()); if (timer != null) { log.trace("tid={} Located tablet {} at {} in {}", Thread.currentThread().getId(), @@ -613,7 +613,7 @@ public class ClientTabletCacheImpl extends ClientTabletCache { // Use anything in the cache w/o a location populated after this point in time. Cache entries // w/o a location created before the following time should be ignored and the metadata table // consulted. - Timer cacheCutoffTimer = Timer.startNew(); + var cacheCutoff = NanoTime.now(); for (int i = 0; i < hostAheadCount; i++) { if (currTablet.endRow() == null || hostAheadRange @@ -622,7 +622,7 @@ public class ClientTabletCacheImpl extends ClientTabletCache { } CachedTablet followingTablet = _findTablet(context, currTablet.endRow(), true, false, true, - lcSession, locationNeed, cacheCutoffTimer); + lcSession, locationNeed, cacheCutoff); if (followingTablet == null) { break; @@ -684,14 +684,14 @@ public class ClientTabletCacheImpl extends ClientTabletCache { List<TKeyExtent> extentsToBringOnline = new ArrayList<>(); for (var cachedTablet : tabletsWithNoLocation) { - if (cachedTablet.getCreationTimer().elapsed().compareTo(STALE_DURATION) < 0) { + if (cachedTablet.getCreationTime().elapsed().compareTo(STALE_DURATION) < 0) { if (cachedTablet.getAvailability() == TabletAvailability.ONDEMAND) { if (!cachedTablet.wasHostingRequested()) { extentsToBringOnline.add(cachedTablet.getExtent().toThrift()); log.trace("requesting ondemand tablet to be hosted {}", cachedTablet.getExtent()); } else { log.trace("ignoring ondemand tablet that already has a hosting request in place {} {}", - cachedTablet.getExtent(), cachedTablet.getCreationTimer().elapsed()); + cachedTablet.getExtent(), cachedTablet.getCreationTime().elapsed()); } } else if (cachedTablet.getAvailability() == TabletAvailability.UNHOSTED) { throw new InvalidTabletHostingRequestException("Extent " + cachedTablet.getExtent() @@ -861,13 +861,13 @@ public class ClientTabletCacheImpl extends ClientTabletCache { } /** - * @param cacheCutoffTimer Tablets w/o locations are cached. When LocationNeed is REQUIRED, this - * Timer value is used to determine if cached entries w/o a location should be used or of - * we should instead ignore them and reread the tablet information from the metadata table. + * @param cacheCutoff Tablets w/o locations are cached. When LocationNeed is REQUIRED, this cut + * off is used to determine if cached entries w/o a location should be used or of we should + * instead ignore them and reread the tablet information from the metadata table. */ protected CachedTablet _findTablet(ClientContext context, Text row, boolean skipRow, boolean retry, boolean lock, LockCheckerSession lcSession, LocationNeed locationNeed, - Timer cacheCutoffTimer) throws AccumuloException, AccumuloSecurityException, + NanoTime cacheCutoff) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, InvalidTabletHostingRequestException { if (skipRow) { @@ -889,8 +889,7 @@ public class ClientTabletCacheImpl extends ClientTabletCache { } if (tl == null || (locationNeed == LocationNeed.REQUIRED && tl.getTserverLocation().isEmpty() - && tl.getCreationTimer().elapsed(NANOSECONDS) > cacheCutoffTimer.elapsed(NANOSECONDS))) { - + && tl.getCreationTime().compareTo(cacheCutoff) < 0)) { // not in cache OR the cached entry was created before the cut off time, so obtain info from // metadata table if (lock) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index add5b7cf11..329e432b9b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -52,10 +52,10 @@ import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.logging.FateLogger; import org.apache.accumulo.core.util.ShutdownUtil; -import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.thrift.TApplicationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -307,18 +307,18 @@ public class Fate<T> { } protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception { - var startTime = Timer.startNew(); + var startTime = NanoTime.now(); var deferTime = op.isReady(fateId, environment); log.debug("Running {}.isReady() {} took {} ms and returned {}", op.getName(), fateId, - startTime.elapsed(MILLISECONDS), deferTime); + startTime.elapsed().toMillis(), deferTime); return deferTime; } protected Repo<T> executeCall(FateId fateId, Repo<T> op) throws Exception { - var startTime = Timer.startNew(); + var startTime = NanoTime.now(); var next = op.call(fateId, environment); log.debug("Running {}.call() {} took {} ms and returned {}", op.getName(), fateId, - startTime.elapsed(MILLISECONDS), next == null ? "null" : next.getName()); + startTime.elapsed().toMillis(), next == null ? "null" : next.getName()); return next; } diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java index d04d82714a..91a1232954 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java @@ -20,7 +20,6 @@ package org.apache.accumulo.core.lock; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.SECONDS; import java.util.ArrayList; import java.util.List; @@ -33,8 +32,8 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; -import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UuidUtil; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -560,11 +559,11 @@ public class ServiceLock implements Watcher { ZooUtil.recursiveDelete(zooKeeper, pathToDelete, NodeMissingPolicy.SKIP); // Wait for the delete to happen on the server before exiting method - Timer start = Timer.startNew(); + NanoTime start = NanoTime.now(); while (zooKeeper.exists(pathToDelete, null) != null) { Thread.onSpinWait(); - if (start.hasElapsed(10, SECONDS)) { - start.restart(); + if (NanoTime.now().subtract(start).toSeconds() > 10) { + start = NanoTime.now(); LOG.debug("[{}] Still waiting for zookeeper to delete all at {}", vmLockPrefix, pathToDelete); } diff --git a/core/src/main/java/org/apache/accumulo/core/util/Timer.java b/core/src/main/java/org/apache/accumulo/core/util/Timer.java index cf06789993..b7fa4567cf 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/Timer.java +++ b/core/src/main/java/org/apache/accumulo/core/util/Timer.java @@ -32,6 +32,10 @@ public final class Timer { this.startNanos = System.nanoTime(); } + private Timer(long offsetNanos) { + this.startNanos = System.nanoTime() + offsetNanos; + } + /** * Creates and starts a new Timer instance. * @@ -41,6 +45,27 @@ public final class Timer { return new Timer(); } + /** + * Creates a new Timer with an offset applied. + * + * @param offset the duration of the offset to apply. + * @return a new Timer instance with the specified offset. + */ + public static Timer startNewWithOffset(Duration offset) { + return new Timer(offset.toNanos()); + } + + /** + * Creates a new Timer with an offset applied. + * + * @param offset the duration of the offset to apply. + * @param unit the TimeUnit of the offset. + * @return a new Timer instance with the specified offset. + */ + public static Timer startNewWithOffset(long offset, TimeUnit unit) { + return new Timer(unit.toNanos(offset)); + } + /** * Resets the start point for this timer. */ diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 057818b0ac..5aa91afaa1 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.core.util.compaction; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTIONS_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL; @@ -48,8 +47,8 @@ import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -280,7 +279,7 @@ public class ExternalCompactionUtil { } public static int countCompactors(String groupName, ClientContext context) { - var start = Timer.startNew(); + var start = NanoTime.now(); String groupRoot = context.getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + groupName; List<String> children = context.getZooCache().getChildren(groupRoot); if (children == null) { @@ -296,7 +295,7 @@ public class ExternalCompactionUtil { } } - long elapsed = start.elapsed(MILLISECONDS); + long elapsed = start.elapsed().toMillis(); if (elapsed > 100) { LOG.debug("Took {} ms to count {} compactors for {}", elapsed, count, groupName); } else { diff --git a/core/src/main/java/org/apache/accumulo/core/util/time/NanoTime.java b/core/src/main/java/org/apache/accumulo/core/util/time/NanoTime.java new file mode 100644 index 0000000000..f081278589 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/time/NanoTime.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.time; + +import java.time.Duration; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class implements a strong type for System.nanoTime() that offers the limited operations that + * can be performed on a nanoTime. See the System.nanoTime() javadoc for details - specifically + * these values are meaningful only when the difference between two such values, obtained within the + * same instance of a Java virtual machine, are computed. + */ +public final class NanoTime implements Comparable<NanoTime> { + // In the System.nanoTime javadoc it describes the returned value as the "nanoseconds since some + // fixed but arbitrary origin time (perhaps in the future, so values may be negative)". This + // variable name is derived from that where AO is arbitrary origin. + private final long nanosSinceAO; + + // This method should only be called by test inorder to test edge conditions, that is why it is + // package private. Calling this outside of test makes it hard to reason about the correctness of + // using this class. + @VisibleForTesting + NanoTime(long ntsao) { + this.nanosSinceAO = ntsao; + } + + /** + * @return this.nanoTime - other.nanoTime as a Duration + */ + public Duration subtract(NanoTime other) { + return Duration.ofNanos(nanosSinceAO - other.nanosSinceAO); + } + + /** + * Determines the amount of time that has elapsed since this object was created relative to the + * current nanoTime. + * + * @return System.nanoTime() - this.nanoTime + */ + public Duration elapsed() { + return Duration.ofNanos(System.nanoTime() - nanosSinceAO); + } + + @Override + public boolean equals(Object other) { + if (other instanceof NanoTime) { + return nanosSinceAO == ((NanoTime) other).nanosSinceAO; + } + + return false; + } + + @Override + public int hashCode() { + return Long.hashCode(nanosSinceAO); + } + + @Override + public int compareTo(NanoTime other) { + // All operations w/ nanoTimes must use differences, can not directly compare. This is because a + // nano time of Long.MAX_VALUE -10 is considered less than Long.MAX_VALUE +10 + long diff = nanosSinceAO - other.nanosSinceAO; + + if (diff < 0) { + return -1; + } else if (diff > 0) { + return 1; + } else { + return 0; + } + } + + /** + * @return a NanoTime created using System.nanoTime() + */ + public static NanoTime now() { + return new NanoTime(System.nanoTime()); + } + + /** + * @return a NanoTime created using System.nanoTime() + duration.toNanos() + */ + public static NanoTime nowPlus(Duration duration) { + return new NanoTime(System.nanoTime() + duration.toNanos()); + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/util/TimerTest.java b/core/src/test/java/org/apache/accumulo/core/util/TimerTest.java index 67b40d07e3..c9fcb9e464 100644 --- a/core/src/test/java/org/apache/accumulo/core/util/TimerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/util/TimerTest.java @@ -96,4 +96,38 @@ public class TimerTest { } + @Test + public void testStartNewWithOffsetDuration() throws InterruptedException { + Timer timer = Timer.startNewWithOffset(Duration.ofMillis(100)); + + assertFalse(timer.hasElapsed(Duration.ZERO)); + + Thread.sleep(50); + + assertFalse(timer.hasElapsed(Duration.ZERO), + "The timer should not indicate time has elapsed before the offset has passed."); + + Thread.sleep(60); + + assertTrue(timer.hasElapsed(Duration.ZERO), + "The timer should indicate time has elapsed after the offset has passed."); + } + + @Test + public void testStartNewWithOffsetTimeUnit() throws InterruptedException { + Timer timer = Timer.startNewWithOffset(100, MILLISECONDS); + + assertFalse(timer.hasElapsed(0, MILLISECONDS)); + + Thread.sleep(50); + + assertFalse(timer.hasElapsed(0, MILLISECONDS), + "The timer should not indicate time has elapsed before the offset has passed."); + + Thread.sleep(60); + + assertTrue(timer.hasElapsed(0, MILLISECONDS), + "The timer should indicate time has elapsed after the offset has passed."); + } + } diff --git a/core/src/test/java/org/apache/accumulo/core/util/time/NanoTimeTest.java b/core/src/test/java/org/apache/accumulo/core/util/time/NanoTimeTest.java new file mode 100644 index 0000000000..d306aafc39 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/util/time/NanoTimeTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.time; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.Test; + +public class NanoTimeTest { + @Test + public void testMultipleTimes() { + List<NanoTime> ntimes = new ArrayList<>(); + + NanoTime prev = NanoTime.now(); + ntimes.add(prev); + + for (int i = 0; i < 100; i++) { + NanoTime next = NanoTime.now(); + while (prev.equals(next)) { + next = NanoTime.now(); + } + + ntimes.add(next); + prev = next; + } + + long curr = System.nanoTime(); + while (curr == System.nanoTime()) {} + + var start = NanoTime.now(); + + while (start.equals(NanoTime.now())) {} + + for (int i = 1; i < ntimes.size(); i++) { + var last = ntimes.get(i - 1); + var next = ntimes.get(i); + assertTrue(last.compareTo(next) < 0); + assertTrue(next.compareTo(last) > 0); + assertTrue(next.compareTo(next) == 0); + assertTrue(next.elapsed().toNanos() > 0); + assertEquals(next, next); + assertEquals(next.hashCode(), next.hashCode()); + assertNotEquals(last, next); + assertNotEquals(last.hashCode(), next.hashCode()); + + var duration1 = next.elapsed(); + var duration2 = start.subtract(last); + var duration3 = start.subtract(next); + + assertTrue(duration2.compareTo(duration3) > 0); + assertTrue(duration1.compareTo(duration3) > 0); + } + + var copy = List.copyOf(ntimes); + Collections.shuffle(ntimes); + Collections.sort(ntimes); + assertEquals(copy, ntimes); + } + + @Test + public void testBoundry() { + // tests crossing the Long.MAX_VALUE boundry + long origin = Long.MAX_VALUE - 1000; + + List<NanoTime> ntimes = new ArrayList<>(); + + // add times that start positive and then go negative + for (int i = 0; i < 20; i++) { + var nt = i * 100 + origin; + ntimes.add(new NanoTime(nt)); + } + + for (int i = 1; i < ntimes.size(); i++) { + var last = ntimes.get(i - 1); + var next = ntimes.get(i); + assertEquals(100, next.subtract(last).toNanos()); + assertEquals(-100, last.subtract(next).toNanos()); + assertTrue(next.compareTo(last) > 0); + assertTrue(last.compareTo(next) < 0); + assertTrue(next.compareTo(next) == 0); + } + + var copy = List.copyOf(ntimes); + Collections.shuffle(ntimes); + Collections.sort(ntimes); + assertEquals(copy, ntimes); + } + + @Test + public void testNowPlus() { + + List<NanoTime> ntimes = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + ntimes.add(NanoTime.nowPlus(Duration.ofHours(i))); + } + + for (int i = 1; i < ntimes.size(); i++) { + var last = ntimes.get(i - 1); + var next = ntimes.get(i); + + var duration = next.subtract(last); + + assertTrue(duration.compareTo(Duration.ofHours(1)) >= 0); + // This could fail if the test process were paused for more than 3 minutes + assertTrue(duration.compareTo(Duration.ofMinutes(63)) < 0); + assertTrue(next.elapsed().compareTo(Duration.ZERO) < 0); + } + + var copy = List.copyOf(ntimes); + Collections.shuffle(ntimes); + Collections.sort(ntimes); + assertEquals(copy, ntimes); + + ntimes.clear(); + + // nano time can compute elapsed times in a 290 year period which should wrap Long.MAX_VALUE no + // matter where it starts + for (int i = 0; i < 290; i++) { + ntimes.add(NanoTime.nowPlus(Duration.ofDays(365 * i))); + } + + for (int i = 1; i < ntimes.size(); i++) { + var last = ntimes.get(i - 1); + var next = ntimes.get(i); + + var duration = next.subtract(last); + + assertTrue(duration.compareTo(Duration.ofDays(365)) >= 0); + assertTrue(duration.compareTo(Duration.ofDays(366)) < 0); + assertTrue(next.elapsed().compareTo(Duration.ZERO) < 0); + } + + copy = List.copyOf(ntimes); + Collections.shuffle(ntimes); + Collections.sort(ntimes); + assertEquals(copy, ntimes); + } + +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 02917807cd..2fd823e447 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@ -18,8 +18,6 @@ */ package org.apache.accumulo.server.compaction; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -69,7 +67,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; -import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; @@ -122,7 +120,7 @@ public class FileCompactor implements Callable<CompactionStats> { // things to report private String currentLocalityGroup = ""; - private volatile Timer startTime; + private volatile NanoTime startTime; private final AtomicInteger timesPaused = new AtomicInteger(0); @@ -137,7 +135,7 @@ public class FileCompactor implements Callable<CompactionStats> { private static final LongAdder totalEntriesRead = new LongAdder(); private static final LongAdder totalEntriesWritten = new LongAdder(); - private static final Timer lastUpdateTime = Timer.startNew(); + private static volatile NanoTime lastUpdateTime = NanoTime.now(); private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); @@ -214,11 +212,11 @@ public class FileCompactor implements Callable<CompactionStats> { * is rate limited, so it will not cause issues if called too frequently. */ private static void updateTotalEntries() { - if (!lastUpdateTime.hasElapsed(100, MILLISECONDS)) { + if (lastUpdateTime.elapsed().compareTo(Duration.ofMillis(100)) < 0) { return; } runningCompactions.forEach(FileCompactor::updateGlobalEntryCounts); - lastUpdateTime.restart(); + lastUpdateTime = NanoTime.now(); } protected static final Set<FileCompactor> runningCompactions = @@ -281,7 +279,7 @@ public class FileCompactor implements Callable<CompactionStats> { CompactionStats majCStats = new CompactionStats(); - startTime = Timer.startNew(); + startTime = NanoTime.now(); boolean remove = runningCompactions.add(this); diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index ea8a90bfe4..bf54437d73 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -95,11 +95,11 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; -import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.client.ClientServiceHandler; @@ -514,12 +514,12 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac return new FileCompactorRunnable() { private final AtomicReference<FileCompactor> compactor = new AtomicReference<>(); - private volatile Timer compactionStartTime; + private volatile NanoTime compactionStartTime; @Override public void initialize() throws RetriesExceededException { LOG.info("Starting up compaction runnable for job: {}", job); - this.compactionStartTime = Timer.startNew(); + this.compactionStartTime = NanoTime.now(); TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.STARTED, "Compaction started", -1, -1, -1, getCompactionAge().toNanos()); updateCompactionState(job, update); diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index a4c9b5f574..b32751e904 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -19,7 +19,6 @@ package org.apache.accumulo.gc; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.io.FileNotFoundException; import java.io.IOException; @@ -54,9 +53,9 @@ import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.balancer.TableLoadBalancer; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; -import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.gc.metrics.GcCycleMetrics; import org.apache.accumulo.gc.metrics.GcMetrics; import org.apache.accumulo.server.AbstractServer; @@ -91,7 +90,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { private final GcCycleMetrics gcCycleMetrics = new GcCycleMetrics(); private ServiceLock gcLock; - private Timer lastCompactorCheck = Timer.startNew(); + private NanoTime lastCompactorCheck = NanoTime.now(); SimpleGarbageCollector(ConfigOpts opts, String[] args) { super("gc", opts, ServerContext::new, args); @@ -307,7 +306,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { gcCycleMetrics.incrementRunCycleCount(); long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); - if (lastCompactorCheck.hasElapsed(gcDelay * 3, MILLISECONDS)) { + if (NanoTime.now().subtract(lastCompactorCheck).toMillis() > gcDelay * 3) { Map<String,Set<TableId>> resourceMapping = new HashMap<>(); for (TableId tid : AccumuloTable.allTableIds()) { TableConfiguration tconf = getContext().getTableConfiguration(tid); @@ -322,7 +321,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { e.getValue()); } } - lastCompactorCheck.restart(); + lastCompactorCheck = NanoTime.now(); } log.debug("Sleeping for {} milliseconds", gcDelay); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index e3b99b1615..72c79121e5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -115,9 +115,9 @@ import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; -import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; import org.apache.accumulo.manager.metrics.BalancerMetrics; @@ -1066,10 +1066,10 @@ public class Manager extends AbstractServer // wait at least 10 seconds final Duration timeToWait = Comparators.max(Duration.ofSeconds(10), Duration.ofMillis(rpcTimeout / 3)); - final Timer startTime = Timer.startNew(); + final NanoTime startTime = NanoTime.now(); // Wait for all tasks to complete while (!tasks.isEmpty()) { - boolean cancel = startTime.hasElapsed(timeToWait); + boolean cancel = (startTime.elapsed().compareTo(timeToWait) > 0); Iterator<Future<?>> iter = tasks.iterator(); while (iter.hasNext()) { Future<?> f = iter.next(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java index bd94afd15b..59c010887f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java @@ -18,8 +18,6 @@ */ package org.apache.accumulo.manager.tableOps.availability; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -42,7 +40,7 @@ import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; -import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; @@ -99,7 +97,7 @@ public class SetTabletAvailability extends ManagerRepo { } }; - var start = Timer.startNew(); + var start = NanoTime.now(); try ( TabletsMetadata m = manager.getContext().getAmple().readTablets().forTable(tableId) .overlapping(scanRangeStart, true, null).build(); @@ -141,7 +139,7 @@ public class SetTabletAvailability extends ManagerRepo { } if (notAccepted.get() > 0) { - return Math.min(30000, Math.max(start.elapsed(MILLISECONDS), 1)); + return Math.min(30000, Math.max(start.elapsed().toMillis(), 1)); } else { return 0; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java index ffaa6adcd2..7380aafe81 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.manager.tableOps.merge; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; @@ -33,7 +32,7 @@ import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; -import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.slf4j.Logger; @@ -72,7 +71,7 @@ public class ReserveTablets extends ManagerRepo { int locations = 0; int wals = 0; - var startTime = Timer.startNew(); + var startTime = NanoTime.now(); try ( var tablets = env.getContext().getAmple().readTablets().forTable(data.tableId) .overlapping(range.prevEndRow(), range.endRow()).fetch(PREV_ROW, LOCATION, LOGS, OPID) @@ -98,7 +97,7 @@ public class ReserveTablets extends ManagerRepo { count++; } } - var maxSleepTime = Math.min(60000, startTime.elapsed(MILLISECONDS)); + var maxSleepTime = Math.min(60000, startTime.elapsed().toMillis()); log.debug( "{} reserve tablets op:{} count:{} other opids:{} opids set:{} locations:{} accepted:{} wals:{}", diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 7827358ca8..c8516d103d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -91,10 +91,10 @@ import org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.util.Halt; -import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.client.ClientServiceHandler; @@ -720,7 +720,7 @@ public class ScanServer extends AbstractServer @VisibleForTesting ScanReservation reserveFilesInstrumented(Map<KeyExtent,List<TRange>> extents) throws AccumuloException { - Timer start = Timer.startNew(); + NanoTime start = NanoTime.now(); try { return reserveFiles(extents); } finally { @@ -761,7 +761,7 @@ public class ScanServer extends AbstractServer @VisibleForTesting ScanReservation reserveFilesInstrumented(long scanId) throws NoSuchScanIDException { - Timer start = Timer.startNew(); + NanoTime start = NanoTime.now(); try { return reserveFiles(scanId); } finally { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java index cf2ac7fe1c..752260ee8e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java @@ -23,7 +23,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.thrift.TabletLoadState; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; -import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.server.manager.state.DistributedStoreException; import org.apache.accumulo.server.manager.state.TabletStateStore; @@ -37,7 +37,7 @@ class UnloadTabletHandler implements Runnable { private final KeyExtent extent; private final TUnloadTabletGoal goalState; private final SteadyTime requestTime; - private final Timer createTime; + private final NanoTime createTime; private final TabletServer server; public UnloadTabletHandler(TabletServer server, KeyExtent extent, TUnloadTabletGoal goalState, @@ -46,7 +46,7 @@ class UnloadTabletHandler implements Runnable { this.goalState = goalState; this.server = server; this.requestTime = requestTime; - this.createTime = Timer.startNew(); + this.createTime = NanoTime.now(); } @Override