This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new bd8a67fa4e ZooStore deferral time to use System.nanoTime() (#4126) bd8a67fa4e is described below commit bd8a67fa4e9763cc4a4bf1d8c2528167616b092c Author: Kevin Rathbun <43969518+kevinrr...@users.noreply.github.com> AuthorDate: Thu Jan 4 14:19:28 2024 -0500 ZooStore deferral time to use System.nanoTime() (#4126) - ZooStore() now uses System.nanoTime() instead of System.currentTimeMillis() - Added TimeUnit param to unreserve() - Renamed 'defered' -> 'deferred' --- .../org/apache/accumulo/core/fate/AdminUtil.java | 7 ++++--- .../org/apache/accumulo/core/fate/AgeOffStore.java | 9 ++++---- .../java/org/apache/accumulo/core/fate/Fate.java | 13 ++++++------ .../apache/accumulo/core/fate/ReadOnlyTStore.java | 8 +++++--- .../org/apache/accumulo/core/fate/ZooStore.java | 24 +++++++++++++--------- .../apache/accumulo/core/logging/FateLogger.java | 5 +++-- .../apache/accumulo/core/fate/AgeOffStoreTest.java | 17 +++++++-------- .../org/apache/accumulo/core/fate/TestStore.java | 3 ++- .../accumulo/shell/commands/FateCommandTest.java | 3 ++- 9 files changed, 51 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 0ce95f16f9..87d006c5f8 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.FateLock; @@ -367,7 +368,7 @@ public class AdminUtil<T> { long timeCreated = zs.timeCreated(tid); - zs.unreserve(tid, 0); + zs.unreserve(tid, 0, TimeUnit.MILLISECONDS); if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) { statuses.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated)); @@ -450,7 +451,7 @@ public class AdminUtil<T> { break; } - zs.unreserve(txid, 0); + zs.unreserve(txid, 0, TimeUnit.MILLISECONDS); return state; } @@ -494,7 +495,7 @@ public class AdminUtil<T> { break; } - zs.unreserve(txid, 0); + zs.unreserve(txid, 0, TimeUnit.MILLISECONDS); return state; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java index 5ed59f21fe..ca016d0c9c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,7 +108,7 @@ public class AgeOffStore<T> implements TStore<T> { } } finally { - store.unreserve(txid, 0); + store.unreserve(txid, 0, TimeUnit.MILLISECONDS); } } catch (Exception e) { log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e); @@ -137,7 +138,7 @@ public class AgeOffStore<T> implements TStore<T> { break; } } finally { - store.unreserve(txid, 0); + store.unreserve(txid, 0, TimeUnit.MILLISECONDS); } } } @@ -165,8 +166,8 @@ public class AgeOffStore<T> implements TStore<T> { } @Override - public void unreserve(long tid, long deferTime) { - store.unreserve(tid, deferTime); + public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) { + store.unreserve(tid, deferTime, deferTimeUnit); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index e9cbd76844..8dadac916e 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -131,7 +132,7 @@ public class Fate<T> { runnerLog.error("Uncaught exception in FATE runner thread.", e); } finally { if (tid != null) { - store.unreserve(tid, deferTime); + store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS); } } } @@ -295,7 +296,7 @@ public class Fate<T> { store.setStatus(tid, SUBMITTED); } } finally { - store.unreserve(tid, 0); + store.unreserve(tid, 0, TimeUnit.MILLISECONDS); } } @@ -331,7 +332,7 @@ public class Fate<T> { return false; } } finally { - store.unreserve(tid, 0); + store.unreserve(tid, 0, TimeUnit.MILLISECONDS); } } else { // reserved, lets retry. @@ -362,7 +363,7 @@ public class Fate<T> { break; } } finally { - store.unreserve(tid, 0); + store.unreserve(tid, 0, TimeUnit.MILLISECONDS); } } @@ -375,7 +376,7 @@ public class Fate<T> { } return (String) store.getTransactionInfo(tid, TxInfo.RETURN_VALUE); } finally { - store.unreserve(tid, 0); + store.unreserve(tid, 0, TimeUnit.MILLISECONDS); } } @@ -389,7 +390,7 @@ public class Fate<T> { } return (Exception) store.getTransactionInfo(tid, TxInfo.EXCEPTION); } finally { - store.unreserve(tid, 0); + store.unreserve(tid, 0, TimeUnit.MILLISECONDS); } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java index e4f55e4b16..4a216f1e36 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java @@ -21,6 +21,7 @@ package org.apache.accumulo.core.fate; import java.io.Serializable; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Read only access to a Transaction Store. @@ -76,10 +77,11 @@ public interface ReadOnlyTStore<T> { * longer interact with it. * * @param tid transaction id, previously reserved. - * @param deferTime time in millis to keep this transaction out of the pool used in the - * {@link #reserve() reserve} method. must be non-negative. + * @param deferTime time to keep this transaction out of the pool used in the {@link #reserve() + * reserve} method. must be non-negative. + * @param deferTimeUnit the time unit of deferTime */ - void unreserve(long tid, long deferTime); + void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit); /** * Get the current operation for the given transaction id. diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index da8572c7cb..219581268c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -37,6 +37,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -61,7 +62,7 @@ public class ZooStore<T> implements TStore<T> { private ZooReaderWriter zk; private String lastReserved = ""; private Set<Long> reserved; - private Map<Long,Long> defered; + private Map<Long,Long> deferred; private static final SecureRandom random = new SecureRandom(); private long statusChangeEvents = 0; private int reservationsWaiting = 0; @@ -106,7 +107,7 @@ public class ZooStore<T> implements TStore<T> { this.path = path; this.zk = zk; this.reserved = new HashSet<>(); - this.defered = new HashMap<>(); + this.deferred = new HashMap<>(); zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP); } @@ -163,9 +164,9 @@ public class ZooStore<T> implements TStore<T> { continue; } - if (defered.containsKey(tid)) { - if (defered.get(tid) < System.currentTimeMillis()) { - defered.remove(tid); + if (deferred.containsKey(tid)) { + if ((deferred.get(tid) - System.nanoTime()) < 0) { + deferred.remove(tid); } else { continue; } @@ -200,11 +201,13 @@ public class ZooStore<T> implements TStore<T> { synchronized (this) { // suppress lgtm alert - synchronized variable is not always true if (events == statusChangeEvents) { // lgtm [java/constant-comparison] - if (defered.isEmpty()) { + if (deferred.isEmpty()) { this.wait(5000); } else { - Long minTime = Collections.min(defered.values()); - long waitTime = minTime - System.currentTimeMillis(); + long currTime = System.nanoTime(); + long minWait = + deferred.values().stream().mapToLong(l -> l - currTime).min().getAsLong(); + long waitTime = TimeUnit.MILLISECONDS.convert(minWait, TimeUnit.NANOSECONDS); if (waitTime > 0) { this.wait(Math.min(waitTime, 5000)); } @@ -271,7 +274,8 @@ public class ZooStore<T> implements TStore<T> { } @Override - public void unreserve(long tid, long deferTime) { + public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) { + deferTime = TimeUnit.NANOSECONDS.convert(deferTime, deferTimeUnit); if (deferTime < 0) { throw new IllegalArgumentException("deferTime < 0 : " + deferTime); @@ -284,7 +288,7 @@ public class ZooStore<T> implements TStore<T> { } if (deferTime > 0) { - defered.put(tid, System.currentTimeMillis() + deferTime); + deferred.put(tid, System.nanoTime() + deferTime); } this.notifyAll(); diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index fd31a95e6c..ccad01a7f1 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -23,6 +23,7 @@ import static org.apache.accumulo.core.fate.FateTxId.formatTid; import java.io.Serializable; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.accumulo.core.fate.Fate; @@ -61,8 +62,8 @@ public class FateLogger { } @Override - public void unreserve(long tid, long deferTime) { - store.unreserve(tid, deferTime); + public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) { + store.unreserve(tid, deferTime, deferTimeUnit); } @Override diff --git a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java index f36d7494b4..c2b086ee34 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.AgeOffStore.TimeSource; import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; @@ -52,7 +53,7 @@ public class AgeOffStoreTest { long txid1 = aoStore.create(); aoStore.reserve(txid1); aoStore.setStatus(txid1, TStatus.IN_PROGRESS); - aoStore.unreserve(txid1, 0); + aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS); aoStore.ageOff(); @@ -60,7 +61,7 @@ public class AgeOffStoreTest { aoStore.reserve(txid2); aoStore.setStatus(txid2, TStatus.IN_PROGRESS); aoStore.setStatus(txid2, TStatus.FAILED); - aoStore.unreserve(txid2, 0); + aoStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS); tts.time = 6; @@ -68,7 +69,7 @@ public class AgeOffStoreTest { aoStore.reserve(txid3); aoStore.setStatus(txid3, TStatus.IN_PROGRESS); aoStore.setStatus(txid3, TStatus.SUCCESSFUL); - aoStore.unreserve(txid3, 0); + aoStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS); Long txid4 = aoStore.create(); @@ -101,19 +102,19 @@ public class AgeOffStoreTest { long txid1 = testStore.create(); testStore.reserve(txid1); testStore.setStatus(txid1, TStatus.IN_PROGRESS); - testStore.unreserve(txid1, 0); + testStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS); long txid2 = testStore.create(); testStore.reserve(txid2); testStore.setStatus(txid2, TStatus.IN_PROGRESS); testStore.setStatus(txid2, TStatus.FAILED); - testStore.unreserve(txid2, 0); + testStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS); long txid3 = testStore.create(); testStore.reserve(txid3); testStore.setStatus(txid3, TStatus.IN_PROGRESS); testStore.setStatus(txid3, TStatus.SUCCESSFUL); - testStore.unreserve(txid3, 0); + testStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS); Long txid4 = testStore.create(); @@ -136,7 +137,7 @@ public class AgeOffStoreTest { aoStore.reserve(txid1); aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS); - aoStore.unreserve(txid1, 0); + aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS); tts.time = 30; @@ -147,7 +148,7 @@ public class AgeOffStoreTest { aoStore.reserve(txid1); aoStore.setStatus(txid1, TStatus.FAILED); - aoStore.unreserve(txid1, 0); + aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS); aoStore.ageOff(); diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 9f6d44b27c..3253c41a90 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * Transient in memory store for transactions. @@ -61,7 +62,7 @@ public class TestStore extends ZooStore<String> { } @Override - public void unreserve(long tid, long deferTime) { + public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) { if (!reserved.remove(tid)) { throw new IllegalStateException(); } diff --git a/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java b/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java index 0deacfb47a..379dd5b5b1 100644 --- a/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java +++ b/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.file.Files; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; @@ -144,7 +145,7 @@ public class FateCommandTest { expectLastCall().once(); zs.setStatus(tid, ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS); expectLastCall().once(); - zs.unreserve(tid, 0); + zs.unreserve(tid, 0, TimeUnit.MILLISECONDS); expectLastCall().once(); TestHelper helper = new TestHelper(true);