This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 6dcf84ed00 Create a SteadyTime type in the Manager (#4494) 6dcf84ed00 is described below commit 6dcf84ed00eca6129eee6a559beebd625dadf966 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Fri May 10 09:55:34 2024 -0400 Create a SteadyTime type in the Manager (#4494) * Create a SteadyTime type in the Manager This replaces Manager.getSteadyTime() long value with a concrete type to make it more apparent what time is being used. The serialization and deserialization logic have been encapsulated as methods to make the conversion consistent. This closes #4482 --- .../apache/accumulo/core/util/time/SteadyTime.java | 84 +++++++++++++ .../accumulo/core/util/time/SteadyTimeTest.java | 56 +++++++++ server/manager/pom.xml | 5 + .../java/org/apache/accumulo/manager/Manager.java | 9 +- .../org/apache/accumulo/manager/ManagerTime.java | 120 ++++++++++++++++--- .../accumulo/manager/TabletGroupWatcher.java | 6 +- .../apache/accumulo/manager/ManagerTimeTest.java | 130 +++++++++++++++++++++ 7 files changed, 389 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/time/SteadyTime.java b/core/src/main/java/org/apache/accumulo/core/util/time/SteadyTime.java new file mode 100644 index 0000000000..a94ae0ce55 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/time/SteadyTime.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.time; + +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; + +/** + * SteadyTime represents an approximation of the total duration of time this cluster has had a + * Manager. Because this represents an elapsed time it is guaranteed to not be negative. SteadyTime + * is not expected to represent real world date times, its main use is for computing deltas similar + * System.nanoTime but across JVM processes. + */ +public class SteadyTime implements Comparable<SteadyTime> { + + private final Duration time; + + private SteadyTime(Duration time) { + Preconditions.checkArgument(!time.isNegative(), "SteadyTime '%s' should not be negative.", + time.toNanos()); + this.time = time; + } + + public long getMillis() { + return time.toMillis(); + } + + public long getNanos() { + return time.toNanos(); + } + + public Duration getDuration() { + return time; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SteadyTime that = (SteadyTime) o; + return Objects.equals(time, that.time); + } + + @Override + public int hashCode() { + return Objects.hashCode(time); + } + + @Override + public int compareTo(SteadyTime other) { + return time.compareTo(other.time); + } + + public static SteadyTime from(long time, TimeUnit unit) { + return new SteadyTime(Duration.of(time, unit.toChronoUnit())); + } + + public static SteadyTime from(Duration time) { + return new SteadyTime(time); + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/util/time/SteadyTimeTest.java b/core/src/test/java/org/apache/accumulo/core/util/time/SteadyTimeTest.java new file mode 100644 index 0000000000..016c771a8d --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/util/time/SteadyTimeTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.time; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +public class SteadyTimeTest { + + @Test + public void testSteadyTime() { + long time = 20_000; + var steadyTime = SteadyTime.from(time, TimeUnit.NANOSECONDS); + + assertEquals(time, steadyTime.getNanos()); + assertEquals(TimeUnit.NANOSECONDS.toMillis(time), steadyTime.getMillis()); + assertEquals(Duration.ofNanos(time), steadyTime.getDuration()); + + // Verify equals and compareTo work correctly for same + var steadyTime2 = SteadyTime.from(time, TimeUnit.NANOSECONDS); + assertEquals(steadyTime, steadyTime2); + assertEquals(0, steadyTime.compareTo(steadyTime2)); + + // Check equals/compareto different objects + var steadyTime3 = SteadyTime.from(time + 100, TimeUnit.NANOSECONDS); + assertNotEquals(steadyTime, steadyTime3); + assertTrue(steadyTime.compareTo(steadyTime3) < 1); + + // Negatives are not allowed + assertThrows(IllegalArgumentException.class, () -> SteadyTime.from(-100, TimeUnit.NANOSECONDS)); + } + +} diff --git a/server/manager/pom.xml b/server/manager/pom.xml index 0511f60527..0213cf2fc9 100644 --- a/server/manager/pom.xml +++ b/server/manager/pom.xml @@ -128,5 +128,10 @@ <artifactId>junit-jupiter-api</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-params</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 5c6cc2f436..ded6d62f83 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -114,6 +114,7 @@ import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.time.NanoTime; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; import org.apache.accumulo.manager.state.TableCounts; @@ -1726,11 +1727,11 @@ public class Manager extends AbstractServer } /** - * Return how long (in milliseconds) there has been a manager overseeing this cluster. This is an - * approximately monotonic clock, which will be approximately consistent between different - * managers or different runs of the same manager. + * Return how long there has been a manager overseeing this cluster. This is an approximately + * monotonic clock, which will be approximately consistent between different managers or different + * runs of the same manager. SteadyTime supports both nanoseconds and milliseconds. */ - public Long getSteadyTime() { + public SteadyTime getSteadyTime() { return timeKeeper.getTime(); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java index 8bd842fc9d..fa7020a0ef 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java @@ -20,11 +20,12 @@ package org.apache.accumulo.manager; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -32,9 +33,13 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * Keep a persistent roughly monotone view of how long a manager has been overseeing this cluster. */ @@ -47,9 +52,41 @@ public class ManagerTime { /** * Difference between time stored in ZooKeeper and System.nanoTime() when we last read from - * ZooKeeper. + * ZooKeeper. This offset may be negative or positive (depending on if the current nanoTime of the + * system is negative or positive) and is represented as a Duration to make computing future + * updates to the skewAmount and SteadyTime simpler. + * <p> + * Example where the skewAmount would be negative: + * <ul> + * <li>There's an existing persisted SteadyTime duration stored in Zookeeper from the total + * previous manager runs of 1,000,000</li> + * <li>Manager starts up and reads the previous value and the gets the current nano time which is + * 2,000,000</li> + * <li>The skew gets computed as the previous steady time duration minus the current time, so that + * becomes: 1,000,000 - 2,000,000 = -1,000,000 resulting in the skew value being negative 1 + * million in this case</li> + * <li>When reading the current SteadyTime from the API, a new SteadyTime is computed by adding + * the current nano time plus the skew. So let's say 100,000 ns have elapsed since the start, so + * the current time is now 2,100,000. This results in:(-1,000,000) + 2,100,000 = 1,100,000. You + * end up with 1.1 million as a SteadyTime value that is the current elapsed time of 100,000 for + * the current manager run plus the previous SteadyTime of 1 million that was read on start.</li> + * </ul> + * + * Example where the skewAmount would be positive: + * <ul> + * <li>The current persisted value from previous runs is 1,000,000</li> + * <li>Manager starts up gets the current nano time which is -2,000,000</li> + * <li>The skew gets computed as: 1,000,000 - (-2,000,000) = 3,000,000 resulting in the skew value + * being positive 3 million in this case</li> + * <li>When reading the current SteadyTime from the API, a new SteadyTime is computed by adding + * the current nano time plus the skew. So let's say 100,000 ns have elapsed since the start, so + * the current time is now -1,900,000. This results in: (3,000,000) + (-1,900,000) = 1,100,000. + * You end up with 1.1 million as a SteadyTime value that is the current elapsed time of 100,000 + * for the current manager run plus the previous SteadyTime of 1 million that was read on + * start.</li> + * </ul> */ - private final AtomicLong skewAmount; + private final AtomicReference<Duration> skewAmount; public ManagerTime(Manager manager, AccumuloConfiguration conf) throws IOException { this.zPath = manager.getZooKeeperRoot() + Constants.ZMANAGER_TICK; @@ -58,24 +95,23 @@ public class ManagerTime { try { zk.putPersistentData(zPath, "0".getBytes(UTF_8), NodeExistsPolicy.SKIP); - skewAmount = - new AtomicLong(Long.parseLong(new String(zk.getData(zPath), UTF_8)) - System.nanoTime()); + skewAmount = new AtomicReference<>(updateSkew(getZkTime())); } catch (Exception ex) { throw new IOException("Error updating manager time", ex); } ThreadPools.watchCriticalScheduledTask(manager.getContext().getScheduledExecutor() - .scheduleWithFixedDelay(Threads.createNamedRunnable("Manager time keeper", () -> run()), 0, + .scheduleWithFixedDelay(Threads.createNamedRunnable("Manager time keeper", this::run), 0, SECONDS.toMillis(10), MILLISECONDS)); } /** * How long has this cluster had a Manager? * - * @return Approximate total duration this cluster has had a Manager, in milliseconds. + * @return Approximate total duration this cluster has had a Manager */ - public long getTime() { - return NANOSECONDS.toMillis(System.nanoTime() + skewAmount.get()); + public SteadyTime getTime() { + return fromSkew(skewAmount.get()); } public void run() { @@ -86,8 +122,7 @@ public class ManagerTime { case INITIAL: case STOP: try { - long zkTime = Long.parseLong(new String(zk.getData(zPath), UTF_8)); - skewAmount.set(zkTime - System.nanoTime()); + skewAmount.set(updateSkew(getZkTime())); } catch (Exception ex) { if (log.isDebugEnabled()) { log.debug("Failed to retrieve manager tick time", ex); @@ -101,8 +136,7 @@ public class ManagerTime { case UNLOAD_METADATA_TABLETS: case UNLOAD_ROOT_TABLET: try { - zk.putPersistentData(zPath, - Long.toString(System.nanoTime() + skewAmount.get()).getBytes(UTF_8), + zk.putPersistentData(zPath, serialize(fromSkew(skewAmount.get())), NodeExistsPolicy.OVERWRITE); } catch (Exception ex) { if (log.isDebugEnabled()) { @@ -111,4 +145,62 @@ public class ManagerTime { } } } + + private SteadyTime getZkTime() throws InterruptedException, KeeperException { + return deserialize(zk.getData(zPath)); + } + + /** + * Creates a new skewAmount from an existing SteadyTime steadyTime - System.nanoTime() + * + * @param steadyTime existing steadyTime + * @return Updated skew + */ + @VisibleForTesting + static Duration updateSkew(SteadyTime steadyTime) { + return updateSkew(steadyTime, System.nanoTime()); + } + + /** + * Creates a new skewAmount from an existing SteadyTime by subtracting the given time value + * + * @param steadyTime existing steadyTime + * @param time time to subtract to update skew + * @return Updated skew + */ + @VisibleForTesting + static Duration updateSkew(SteadyTime steadyTime, long time) { + return Duration.ofNanos(steadyTime.getNanos() - time); + } + + /** + * Create a new SteadyTime from a skewAmount using System.nanoTime() + skewAmount + * + * @param skewAmount the skew amount to add + * @return A SteadyTime that has been skewed by the given skewAmount + */ + @VisibleForTesting + static SteadyTime fromSkew(Duration skewAmount) { + return fromSkew(System.nanoTime(), skewAmount); + } + + /** + * Create a new SteadyTime from a given time in ns and skewAmount using time + skewAmount + * + * @param time time to add the skew amount to + * @param skewAmount the skew amount to add + * @return A SteadyTime that has been skewed by the given skewAmount + */ + @VisibleForTesting + static SteadyTime fromSkew(long time, Duration skewAmount) { + return SteadyTime.from(skewAmount.plusNanos(time)); + } + + static SteadyTime deserialize(byte[] steadyTime) { + return SteadyTime.from(Long.parseLong(new String(steadyTime, UTF_8)), TimeUnit.NANOSECONDS); + } + + static byte[] serialize(SteadyTime steadyTime) { + return Long.toString(steadyTime.getNanos()).getBytes(UTF_8); + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index c3a77f2297..530bd950b1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -355,7 +355,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { Manager.log.trace("[{}] Requesting TabletServer {} unload {} {}", store.name(), location.getServerInstance(), tls.extent, goal.howUnload()); client.unloadTablet(manager.managerLock, tls.extent, goal.howUnload(), - manager.getSteadyTime()); + manager.getSteadyTime().getMillis()); unloaded++; totalUnloaded++; } catch (TException tException) { @@ -454,7 +454,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { private void hostSuspendedTablet(TabletLists tLists, TabletLocationState tls, Location location, TableConfiguration tableConf) { - if (manager.getSteadyTime() - tls.suspend.suspensionTime + if (manager.getSteadyTime().getMillis() - tls.suspend.suspensionTime < tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) { // Tablet is suspended. See if its tablet server is back. TServerInstance returnInstance = null; @@ -1386,7 +1386,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { deadTablets.subList(0, maxServersToShow)); Manager.log.debug("logs for dead servers: {}", deadLogs); if (canSuspendTablets()) { - store.suspend(deadTablets, deadLogs, manager.getSteadyTime()); + store.suspend(deadTablets, deadLogs, manager.getSteadyTime().getMillis()); } else { store.unassign(deadTablets, deadLogs); } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/ManagerTimeTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/ManagerTimeTest.java new file mode 100644 index 0000000000..528eccca6d --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/ManagerTimeTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.util.time.SteadyTime; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class ManagerTimeTest { + + @Test + public void testSteadyTime() { + long time = 20_000; + var steadyTime = SteadyTime.from(time, TimeUnit.NANOSECONDS); + + // make sure calling serialize on instance matches static helper + byte[] serialized = ManagerTime.serialize(steadyTime); + assertArrayEquals(serialized, ManagerTime.serialize(steadyTime)); + + // Verify deserialization matches original object + var deserialized = ManagerTime.deserialize(serialized); + assertEquals(steadyTime, deserialized); + assertEquals(0, steadyTime.compareTo(deserialized)); + } + + @ParameterizedTest + // Test with both a 0 and positive previous value. This simulates the value + // read out of zookeeper for the time and should never be negative as it is + // based on elapsed time from previous manager runs + @ValueSource(longs = {0, 50_000}) + public void testSteadyTimeFromSkew(long previousTime) throws InterruptedException { + List<Long> times = List.of(-100_000L, -100L, 0L, 20_000L, System.nanoTime()); + + for (Long time : times) { + // ManagerTime builds the skew amount by subtracting the current nanotime + // from the previous persisted time in ZK. The skew can be negative or positive because + // it will depend on if the current nanotime is negative or positive as + // nanotime is allowed to be negative + var skewAmount = + ManagerTime.updateSkew(SteadyTime.from(previousTime, TimeUnit.NANOSECONDS), time); + + // Build a SteadyTime using the skewAmount + // SteadyTime should never be negative + var original = ManagerTime.fromSkew(time, skewAmount); + + // Simulate a future time and create another SteadyTime from the skew which should + // now be after the original + time = time + 10000; + var futureSkew = ManagerTime.fromSkew(time, skewAmount); + + // future should be after the original + assertTrue(futureSkew.compareTo(original) > 0); + } + } + + @ParameterizedTest + // Test with both a 0 and positive previous value. This simulates the value + // read out of zookeeper for the time and should never be negative as it is + // based on elapsed time from previous manager runs + @ValueSource(longs = {0, 50_000}) + public void testSteadyTimeFromSkewCurrent(long previousTime) throws InterruptedException { + // Also test fromSkew(skewAmount) method which only uses System.nanoTime() + var skewAmount = ManagerTime.updateSkew(SteadyTime.from(previousTime, TimeUnit.NANOSECONDS)); + + // Build a SteadyTime using the skewAmount and current time + var original = ManagerTime.fromSkew(skewAmount); + + // sleep a bit so time elapses + Thread.sleep(10); + var futureSkew = ManagerTime.fromSkew(skewAmount); + + // future should be after the original + assertTrue(futureSkew.compareTo(original) > 0); + } + + @ParameterizedTest + // Test with both a 0 and positive previous value. This simulates the value + // read out of zookeeper for the time and should never be negative as it is + // based on elapsed time from previous manager runs + @ValueSource(longs = {0, 50_000}) + public void testSteadyTimeUpdateSkew(long previousTime) throws InterruptedException { + + var steadyTime = SteadyTime.from(previousTime, TimeUnit.NANOSECONDS); + List<Long> times = List.of(-100_000L, -100L, 0L, 20_000L, System.nanoTime()); + + // test updateSkew with various times and previous times + for (Long time : times) { + var expected = steadyTime.getNanos() - time; + + // test that updateSkew computes the update as current steadyTime - time + var skewAmount = ManagerTime.updateSkew(steadyTime, time); + assertEquals(expected, skewAmount.toNanos()); + } + + // test updateSkew with current system time + var skew = ManagerTime.updateSkew(steadyTime); + // sleep a bit so time elapses + Thread.sleep(10); + var updatedSkew = ManagerTime.updateSkew(steadyTime); + // Updating the skew time subtracts the current nanotime from + // the previous value so the updated value should be less than + // a previously created SteadyTime based on the same skew + assertTrue(skew.toNanos() - updatedSkew.toNanos() > 0); + assertTrue(skew.compareTo(updatedSkew) > 0); + } +}