This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new bd8a67fa4e ZooStore deferral time to use System.nanoTime() (#4126)
bd8a67fa4e is described below

commit bd8a67fa4e9763cc4a4bf1d8c2528167616b092c
Author: Kevin Rathbun <43969518+kevinrr...@users.noreply.github.com>
AuthorDate: Thu Jan 4 14:19:28 2024 -0500

    ZooStore deferral time to use System.nanoTime() (#4126)
    
    - ZooStore() now uses System.nanoTime() instead of
      System.currentTimeMillis()
    - Added TimeUnit param to unreserve()
    - Renamed 'defered' -> 'deferred'
---
 .../org/apache/accumulo/core/fate/AdminUtil.java   |  7 ++++---
 .../org/apache/accumulo/core/fate/AgeOffStore.java |  9 ++++----
 .../java/org/apache/accumulo/core/fate/Fate.java   | 13 ++++++------
 .../apache/accumulo/core/fate/ReadOnlyTStore.java  |  8 +++++---
 .../org/apache/accumulo/core/fate/ZooStore.java    | 24 +++++++++++++---------
 .../apache/accumulo/core/logging/FateLogger.java   |  5 +++--
 .../apache/accumulo/core/fate/AgeOffStoreTest.java | 17 +++++++--------
 .../org/apache/accumulo/core/fate/TestStore.java   |  3 ++-
 .../accumulo/shell/commands/FateCommandTest.java   |  3 ++-
 9 files changed, 51 insertions(+), 38 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java 
b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
index 0ce95f16f9..87d006c5f8 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
 import org.apache.accumulo.core.fate.zookeeper.FateLock;
@@ -367,7 +368,7 @@ public class AdminUtil<T> {
 
       long timeCreated = zs.timeCreated(tid);
 
-      zs.unreserve(tid, 0);
+      zs.unreserve(tid, 0, TimeUnit.MILLISECONDS);
 
       if (includeByStatus(status, filterStatus) && includeByTxid(tid, 
filterTxid)) {
         statuses.add(new TransactionStatus(tid, status, txName, hlocks, 
wlocks, top, timeCreated));
@@ -450,7 +451,7 @@ public class AdminUtil<T> {
         break;
     }
 
-    zs.unreserve(txid, 0);
+    zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
     return state;
   }
 
@@ -494,7 +495,7 @@ public class AdminUtil<T> {
         break;
     }
 
-    zs.unreserve(txid, 0);
+    zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
     return state;
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
index 5ed59f21fe..ca016d0c9c 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,7 +108,7 @@ public class AgeOffStore<T> implements TStore<T> {
           }
 
         } finally {
-          store.unreserve(txid, 0);
+          store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
         }
       } catch (Exception e) {
         log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e);
@@ -137,7 +138,7 @@ public class AgeOffStore<T> implements TStore<T> {
             break;
         }
       } finally {
-        store.unreserve(txid, 0);
+        store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
       }
     }
   }
@@ -165,8 +166,8 @@ public class AgeOffStore<T> implements TStore<T> {
   }
 
   @Override
-  public void unreserve(long tid, long deferTime) {
-    store.unreserve(tid, deferTime);
+  public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
+    store.unreserve(tid, deferTime, deferTimeUnit);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index e9cbd76844..8dadac916e 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
@@ -131,7 +132,7 @@ public class Fate<T> {
           runnerLog.error("Uncaught exception in FATE runner thread.", e);
         } finally {
           if (tid != null) {
-            store.unreserve(tid, deferTime);
+            store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS);
           }
         }
       }
@@ -295,7 +296,7 @@ public class Fate<T> {
         store.setStatus(tid, SUBMITTED);
       }
     } finally {
-      store.unreserve(tid, 0);
+      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
     }
 
   }
@@ -331,7 +332,7 @@ public class Fate<T> {
             return false;
           }
         } finally {
-          store.unreserve(tid, 0);
+          store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
         }
       } else {
         // reserved, lets retry.
@@ -362,7 +363,7 @@ public class Fate<T> {
           break;
       }
     } finally {
-      store.unreserve(tid, 0);
+      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
     }
   }
 
@@ -375,7 +376,7 @@ public class Fate<T> {
       }
       return (String) store.getTransactionInfo(tid, TxInfo.RETURN_VALUE);
     } finally {
-      store.unreserve(tid, 0);
+      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
     }
   }
 
@@ -389,7 +390,7 @@ public class Fate<T> {
       }
       return (Exception) store.getTransactionInfo(tid, TxInfo.EXCEPTION);
     } finally {
-      store.unreserve(tid, 0);
+      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java
index e4f55e4b16..4a216f1e36 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.core.fate;
 import java.io.Serializable;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Read only access to a Transaction Store.
@@ -76,10 +77,11 @@ public interface ReadOnlyTStore<T> {
    * longer interact with it.
    *
    * @param tid transaction id, previously reserved.
-   * @param deferTime time in millis to keep this transaction out of the pool 
used in the
-   *        {@link #reserve() reserve} method. must be non-negative.
+   * @param deferTime time to keep this transaction out of the pool used in 
the {@link #reserve()
+   *        reserve} method. must be non-negative.
+   * @param deferTimeUnit the time unit of deferTime
    */
-  void unreserve(long tid, long deferTime);
+  void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit);
 
   /**
    * Get the current operation for the given transaction id.
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
index da8572c7cb..219581268c 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
@@ -37,6 +37,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -61,7 +62,7 @@ public class ZooStore<T> implements TStore<T> {
   private ZooReaderWriter zk;
   private String lastReserved = "";
   private Set<Long> reserved;
-  private Map<Long,Long> defered;
+  private Map<Long,Long> deferred;
   private static final SecureRandom random = new SecureRandom();
   private long statusChangeEvents = 0;
   private int reservationsWaiting = 0;
@@ -106,7 +107,7 @@ public class ZooStore<T> implements TStore<T> {
     this.path = path;
     this.zk = zk;
     this.reserved = new HashSet<>();
-    this.defered = new HashMap<>();
+    this.deferred = new HashMap<>();
 
     zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP);
   }
@@ -163,9 +164,9 @@ public class ZooStore<T> implements TStore<T> {
               continue;
             }
 
-            if (defered.containsKey(tid)) {
-              if (defered.get(tid) < System.currentTimeMillis()) {
-                defered.remove(tid);
+            if (deferred.containsKey(tid)) {
+              if ((deferred.get(tid) - System.nanoTime()) < 0) {
+                deferred.remove(tid);
               } else {
                 continue;
               }
@@ -200,11 +201,13 @@ public class ZooStore<T> implements TStore<T> {
         synchronized (this) {
           // suppress lgtm alert - synchronized variable is not always true
           if (events == statusChangeEvents) { // lgtm 
[java/constant-comparison]
-            if (defered.isEmpty()) {
+            if (deferred.isEmpty()) {
               this.wait(5000);
             } else {
-              Long minTime = Collections.min(defered.values());
-              long waitTime = minTime - System.currentTimeMillis();
+              long currTime = System.nanoTime();
+              long minWait =
+                  deferred.values().stream().mapToLong(l -> l - 
currTime).min().getAsLong();
+              long waitTime = TimeUnit.MILLISECONDS.convert(minWait, 
TimeUnit.NANOSECONDS);
               if (waitTime > 0) {
                 this.wait(Math.min(waitTime, 5000));
               }
@@ -271,7 +274,8 @@ public class ZooStore<T> implements TStore<T> {
   }
 
   @Override
-  public void unreserve(long tid, long deferTime) {
+  public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
+    deferTime = TimeUnit.NANOSECONDS.convert(deferTime, deferTimeUnit);
 
     if (deferTime < 0) {
       throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
@@ -284,7 +288,7 @@ public class ZooStore<T> implements TStore<T> {
       }
 
       if (deferTime > 0) {
-        defered.put(tid, System.currentTimeMillis() + deferTime);
+        deferred.put(tid, System.nanoTime() + deferTime);
       }
 
       this.notifyAll();
diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index fd31a95e6c..ccad01a7f1 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -23,6 +23,7 @@ import static 
org.apache.accumulo.core.fate.FateTxId.formatTid;
 import java.io.Serializable;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import org.apache.accumulo.core.fate.Fate;
@@ -61,8 +62,8 @@ public class FateLogger {
       }
 
       @Override
-      public void unreserve(long tid, long deferTime) {
-        store.unreserve(tid, deferTime);
+      public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
+        store.unreserve(tid, deferTime, deferTimeUnit);
       }
 
       @Override
diff --git 
a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java 
b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
index f36d7494b4..c2b086ee34 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.fate.AgeOffStore.TimeSource;
 import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
@@ -52,7 +53,7 @@ public class AgeOffStoreTest {
     long txid1 = aoStore.create();
     aoStore.reserve(txid1);
     aoStore.setStatus(txid1, TStatus.IN_PROGRESS);
-    aoStore.unreserve(txid1, 0);
+    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 
     aoStore.ageOff();
 
@@ -60,7 +61,7 @@ public class AgeOffStoreTest {
     aoStore.reserve(txid2);
     aoStore.setStatus(txid2, TStatus.IN_PROGRESS);
     aoStore.setStatus(txid2, TStatus.FAILED);
-    aoStore.unreserve(txid2, 0);
+    aoStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);
 
     tts.time = 6;
 
@@ -68,7 +69,7 @@ public class AgeOffStoreTest {
     aoStore.reserve(txid3);
     aoStore.setStatus(txid3, TStatus.IN_PROGRESS);
     aoStore.setStatus(txid3, TStatus.SUCCESSFUL);
-    aoStore.unreserve(txid3, 0);
+    aoStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);
 
     Long txid4 = aoStore.create();
 
@@ -101,19 +102,19 @@ public class AgeOffStoreTest {
     long txid1 = testStore.create();
     testStore.reserve(txid1);
     testStore.setStatus(txid1, TStatus.IN_PROGRESS);
-    testStore.unreserve(txid1, 0);
+    testStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 
     long txid2 = testStore.create();
     testStore.reserve(txid2);
     testStore.setStatus(txid2, TStatus.IN_PROGRESS);
     testStore.setStatus(txid2, TStatus.FAILED);
-    testStore.unreserve(txid2, 0);
+    testStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);
 
     long txid3 = testStore.create();
     testStore.reserve(txid3);
     testStore.setStatus(txid3, TStatus.IN_PROGRESS);
     testStore.setStatus(txid3, TStatus.SUCCESSFUL);
-    testStore.unreserve(txid3, 0);
+    testStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);
 
     Long txid4 = testStore.create();
 
@@ -136,7 +137,7 @@ public class AgeOffStoreTest {
 
     aoStore.reserve(txid1);
     aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS);
-    aoStore.unreserve(txid1, 0);
+    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 
     tts.time = 30;
 
@@ -147,7 +148,7 @@ public class AgeOffStoreTest {
 
     aoStore.reserve(txid1);
     aoStore.setStatus(txid1, TStatus.FAILED);
-    aoStore.unreserve(txid1, 0);
+    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 
     aoStore.ageOff();
 
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java 
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 9f6d44b27c..3253c41a90 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Transient in memory store for transactions.
@@ -61,7 +62,7 @@ public class TestStore extends ZooStore<String> {
   }
 
   @Override
-  public void unreserve(long tid, long deferTime) {
+  public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
     if (!reserved.remove(tid)) {
       throw new IllegalStateException();
     }
diff --git 
a/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java 
b/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java
index 0deacfb47a..379dd5b5b1 100644
--- 
a/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java
+++ 
b/shell/src/test/java/org/apache/accumulo/shell/commands/FateCommandTest.java
@@ -35,6 +35,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.file.Files;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
@@ -144,7 +145,7 @@ public class FateCommandTest {
     expectLastCall().once();
     zs.setStatus(tid, ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS);
     expectLastCall().once();
-    zs.unreserve(tid, 0);
+    zs.unreserve(tid, 0, TimeUnit.MILLISECONDS);
     expectLastCall().once();
 
     TestHelper helper = new TestHelper(true);

Reply via email to