This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 328396b5e1 Improve tablet server batch reader iterator time tracking 
code (#4927)
328396b5e1 is described below

commit 328396b5e1074447e58d12219a38b36f0c021e4e
Author: Dom G. <[email protected]>
AuthorDate: Tue Sep 30 09:19:26 2025 -0400

    Improve tablet server batch reader iterator time tracking code (#4927)
    
    * Simplify code by using CountDownTimer instead of Timer
    
    * Use countdown timer in TimeoutTracker nested class
    
    * Refactor CountDownTimer to reuse Timer internally
---
 .../TabletServerBatchReaderIterator.java           | 38 +++++++++++-----------
 .../apache/accumulo/core/util/CountDownTimer.java  | 20 ++++++++++--
 2 files changed, 36 insertions(+), 22 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index a7de7f5e76..fe77d0b4ce 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -80,6 +80,7 @@ import 
org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.CountDownTimer;
 import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.core.util.Timer;
 import org.apache.thrift.TApplicationException;
@@ -262,7 +263,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
 
     ScanServerData ssd;
 
-    Timer startTime = Timer.startNew();
+    CountDownTimer retryCountDownTimer = CountDownTimer.startNew(retryTimeout, 
MILLISECONDS);
 
     while (true) {
 
@@ -274,7 +275,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
         failures = clientTabletCache.binRanges(context, ranges, binnedRanges);
         ssd = new ScanServerData();
       } else if 
(options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
-        ssd = binRangesForScanServers(clientTabletCache, ranges, binnedRanges, 
startTime);
+        ssd = binRangesForScanServers(clientTabletCache, ranges, binnedRanges, 
retryCountDownTimer);
         failures = ssd.failures;
       } else {
         throw new IllegalStateException();
@@ -299,7 +300,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
               failures.size(), tableId);
         }
 
-        if (startTime.elapsed(MILLISECONDS) > retryTimeout) {
+        if (retryCountDownTimer.isExpired()) {
           // TODO exception used for timeout is inconsistent
           throw new TimedOutException(
               "Failed to find servers to process scans before timeout was 
exceeded.");
@@ -650,10 +651,9 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
   }
 
   private ScanServerData binRangesForScanServers(ClientTabletCache 
clientTabletCache,
-      List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, 
Timer startTime)
-      throws AccumuloException, TableNotFoundException, 
AccumuloSecurityException,
-      InvalidTabletHostingRequestException {
-
+      List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
+      CountDownTimer retryCountDownTimer) throws AccumuloException, 
TableNotFoundException,
+      AccumuloSecurityException, InvalidTabletHostingRequestException {
     ScanServerSelector ecsm = context.getScanServerSelector();
 
     Map<KeyExtent,String> extentToTserverMap = new HashMap<>();
@@ -697,7 +697,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
       @Override
       public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, 
Duration maxWaitTime,
           String description) {
-        Duration timeoutLeft = Duration.ofMillis(retryTimeout - 
startTime.elapsed(MILLISECONDS));
+        Duration timeoutLeft = retryCountDownTimer.timeLeft();
         return ThriftScanner.waitUntil(condition, maxWaitTime, description, 
timeoutLeft, context,
             tableId, log);
       }
@@ -800,8 +800,8 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
     String server;
     Set<String> badServers;
     final long timeOut;
-    long activityTime;
-    Long firstErrorTime = null;
+    CountDownTimer timeoutCountDownTimer;
+    CountDownTimer errorTimer;
 
     TimeoutTracker(String server, Set<String> badServers, long timeOut) {
       this(timeOut);
@@ -811,29 +811,29 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
 
     TimeoutTracker(long timeOut) {
       this.timeOut = timeOut;
+      this.timeoutCountDownTimer = CountDownTimer.startNew(timeOut, 
MILLISECONDS);
     }
 
     void startingScan() {
-      activityTime = System.currentTimeMillis();
+      timeoutCountDownTimer.restart();
     }
 
     void check() throws IOException {
-      if (System.currentTimeMillis() - activityTime > timeOut) {
+      if (timeoutCountDownTimer.isExpired()) {
         badServers.add(server);
-        throw new IOException(
-            "Time exceeded " + (System.currentTimeMillis() - activityTime) + " 
" + server);
+        throw new IOException("Time exceeded " + timeOut + " ms for server " + 
server);
       }
     }
 
     void madeProgress() {
-      activityTime = System.currentTimeMillis();
-      firstErrorTime = null;
+      timeoutCountDownTimer.restart();
+      errorTimer = null;
     }
 
     void errorOccured() {
-      if (firstErrorTime == null) {
-        firstErrorTime = activityTime;
-      } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
+      if (errorTimer == null) {
+        errorTimer = CountDownTimer.startNew(timeOut, MILLISECONDS);
+      } else if (errorTimer.isExpired()) {
         badServers.add(server);
       }
     }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/CountDownTimer.java 
b/core/src/main/java/org/apache/accumulo/core/util/CountDownTimer.java
index 3c7c3792c1..5497e06a27 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/CountDownTimer.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/CountDownTimer.java
@@ -39,11 +39,11 @@ import com.google.common.base.Preconditions;
  * </pre>
  */
 public class CountDownTimer {
-  private final long startNanos;
+  private final Timer timer;
   private final long durationNanos;
 
   private CountDownTimer(long durationNanos) {
-    this.startNanos = System.nanoTime();
+    this.timer = Timer.startNew();
     this.durationNanos = durationNanos;
   }
 
@@ -68,12 +68,19 @@ public class CountDownTimer {
     return new CountDownTimer(unit.toNanos(duration));
   }
 
+  /**
+   * Resets the timer to the initial duration.
+   */
+  public void restart() {
+    timer.restart();
+  }
+
   /**
    * @param unit the desired {@link TimeUnit} for the returned time.
    * @return the remaining time in the specified unit, or zero if expired.
    */
   public long timeLeft(TimeUnit unit) {
-    var elapsed = (System.nanoTime() - startNanos);
+    var elapsed = timer.elapsed(TimeUnit.NANOSECONDS);
     var timeLeft = durationNanos - elapsed;
     if (timeLeft < 0) {
       timeLeft = 0;
@@ -82,6 +89,13 @@ public class CountDownTimer {
     return unit.convert(timeLeft, TimeUnit.NANOSECONDS);
   }
 
+  /**
+   * @return the remaining time, or zero if expired.
+   */
+  public Duration timeLeft() {
+    return Duration.ofNanos(timeLeft(TimeUnit.NANOSECONDS));
+  }
+
   /**
    * Checks if the countdown timer has expired.
    *

Reply via email to