This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 328396b5e1 Improve tablet server batch reader iterator time tracking
code (#4927)
328396b5e1 is described below
commit 328396b5e1074447e58d12219a38b36f0c021e4e
Author: Dom G. <[email protected]>
AuthorDate: Tue Sep 30 09:19:26 2025 -0400
Improve tablet server batch reader iterator time tracking code (#4927)
* Simplify code by using CountDownTimer instead of Timer
* Use countdown timer in TimeoutTracker nested class
* Refactor CountDownTimer to reuse Timer internally
---
.../TabletServerBatchReaderIterator.java | 38 +++++++++++-----------
.../apache/accumulo/core/util/CountDownTimer.java | 20 ++++++++++--
2 files changed, 36 insertions(+), 22 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index a7de7f5e76..fe77d0b4ce 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -80,6 +80,7 @@ import
org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.CountDownTimer;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.Timer;
import org.apache.thrift.TApplicationException;
@@ -262,7 +263,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
ScanServerData ssd;
- Timer startTime = Timer.startNew();
+ CountDownTimer retryCountDownTimer = CountDownTimer.startNew(retryTimeout,
MILLISECONDS);
while (true) {
@@ -274,7 +275,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
failures = clientTabletCache.binRanges(context, ranges, binnedRanges);
ssd = new ScanServerData();
} else if
(options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
- ssd = binRangesForScanServers(clientTabletCache, ranges, binnedRanges,
startTime);
+ ssd = binRangesForScanServers(clientTabletCache, ranges, binnedRanges,
retryCountDownTimer);
failures = ssd.failures;
} else {
throw new IllegalStateException();
@@ -299,7 +300,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
failures.size(), tableId);
}
- if (startTime.elapsed(MILLISECONDS) > retryTimeout) {
+ if (retryCountDownTimer.isExpired()) {
// TODO exception used for timeout is inconsistent
throw new TimedOutException(
"Failed to find servers to process scans before timeout was
exceeded.");
@@ -650,10 +651,9 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
}
private ScanServerData binRangesForScanServers(ClientTabletCache
clientTabletCache,
- List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
Timer startTime)
- throws AccumuloException, TableNotFoundException,
AccumuloSecurityException,
- InvalidTabletHostingRequestException {
-
+ List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
+ CountDownTimer retryCountDownTimer) throws AccumuloException,
TableNotFoundException,
+ AccumuloSecurityException, InvalidTabletHostingRequestException {
ScanServerSelector ecsm = context.getScanServerSelector();
Map<KeyExtent,String> extentToTserverMap = new HashMap<>();
@@ -697,7 +697,7 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
@Override
public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition,
Duration maxWaitTime,
String description) {
- Duration timeoutLeft = Duration.ofMillis(retryTimeout -
startTime.elapsed(MILLISECONDS));
+ Duration timeoutLeft = retryCountDownTimer.timeLeft();
return ThriftScanner.waitUntil(condition, maxWaitTime, description,
timeoutLeft, context,
tableId, log);
}
@@ -800,8 +800,8 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
String server;
Set<String> badServers;
final long timeOut;
- long activityTime;
- Long firstErrorTime = null;
+ CountDownTimer timeoutCountDownTimer;
+ CountDownTimer errorTimer;
TimeoutTracker(String server, Set<String> badServers, long timeOut) {
this(timeOut);
@@ -811,29 +811,29 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
TimeoutTracker(long timeOut) {
this.timeOut = timeOut;
+ this.timeoutCountDownTimer = CountDownTimer.startNew(timeOut,
MILLISECONDS);
}
void startingScan() {
- activityTime = System.currentTimeMillis();
+ timeoutCountDownTimer.restart();
}
void check() throws IOException {
- if (System.currentTimeMillis() - activityTime > timeOut) {
+ if (timeoutCountDownTimer.isExpired()) {
badServers.add(server);
- throw new IOException(
- "Time exceeded " + (System.currentTimeMillis() - activityTime) + "
" + server);
+ throw new IOException("Time exceeded " + timeOut + " ms for server " +
server);
}
}
void madeProgress() {
- activityTime = System.currentTimeMillis();
- firstErrorTime = null;
+ timeoutCountDownTimer.restart();
+ errorTimer = null;
}
void errorOccured() {
- if (firstErrorTime == null) {
- firstErrorTime = activityTime;
- } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
+ if (errorTimer == null) {
+ errorTimer = CountDownTimer.startNew(timeOut, MILLISECONDS);
+ } else if (errorTimer.isExpired()) {
badServers.add(server);
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/CountDownTimer.java
b/core/src/main/java/org/apache/accumulo/core/util/CountDownTimer.java
index 3c7c3792c1..5497e06a27 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/CountDownTimer.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/CountDownTimer.java
@@ -39,11 +39,11 @@ import com.google.common.base.Preconditions;
* </pre>
*/
public class CountDownTimer {
- private final long startNanos;
+ private final Timer timer;
private final long durationNanos;
private CountDownTimer(long durationNanos) {
- this.startNanos = System.nanoTime();
+ this.timer = Timer.startNew();
this.durationNanos = durationNanos;
}
@@ -68,12 +68,19 @@ public class CountDownTimer {
return new CountDownTimer(unit.toNanos(duration));
}
+ /**
+ * Resets the timer to the initial duration.
+ */
+ public void restart() {
+ timer.restart();
+ }
+
/**
* @param unit the desired {@link TimeUnit} for the returned time.
* @return the remaining time in the specified unit, or zero if expired.
*/
public long timeLeft(TimeUnit unit) {
- var elapsed = (System.nanoTime() - startNanos);
+ var elapsed = timer.elapsed(TimeUnit.NANOSECONDS);
var timeLeft = durationNanos - elapsed;
if (timeLeft < 0) {
timeLeft = 0;
@@ -82,6 +89,13 @@ public class CountDownTimer {
return unit.convert(timeLeft, TimeUnit.NANOSECONDS);
}
+ /**
+ * @return the remaining time, or zero if expired.
+ */
+ public Duration timeLeft() {
+ return Duration.ofNanos(timeLeft(TimeUnit.NANOSECONDS));
+ }
+
/**
* Checks if the countdown timer has expired.
*