This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new aed3919e46 Use Duration in Retry instead of long+TimeUnit (#4301) aed3919e46 is described below commit aed3919e4649a3bcd82c35e5d0c65b8192a086d2 Author: Dom G <domgargu...@apache.org> AuthorDate: Tue Feb 27 12:10:29 2024 -0500 Use Duration in Retry instead of long+TimeUnit (#4301) --- .../core/clientImpl/InstanceOperationsImpl.java | 10 +- .../core/clientImpl/NamespaceOperationsImpl.java | 9 +- .../core/clientImpl/TableOperationsImpl.java | 14 +- .../TabletServerBatchReaderIterator.java | 8 +- .../core/clientImpl/TabletServerBatchWriter.java | 9 +- .../accumulo/core/clientImpl/bulk/BulkImport.java | 9 +- .../accumulo/core/fate/zookeeper/ZooReader.java | 8 +- .../java/org/apache/accumulo/core/util/Retry.java | 112 +++++++------- .../org/apache/accumulo/core/util/RetryTest.java | 163 +++++++++++---------- .../server/compaction/RetryableThriftCall.java | 8 +- .../java/org/apache/accumulo/manager/Manager.java | 8 +- .../org/apache/accumulo/tserver/TabletServer.java | 18 +-- .../tserver/compactions/CompactionManager.java | 7 +- .../accumulo/tserver/session/SessionManager.java | 9 +- 14 files changed, 198 insertions(+), 194 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index ea9ff145ec..4e282ecec0 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -20,15 +20,13 @@ package org.apache.accumulo.core.clientImpl; import static com.google.common.base.Preconditions.checkArgument; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; -import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static org.apache.accumulo.core.rpc.ThriftUtil.createClient; import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport; import static org.apache.accumulo.core.rpc.ThriftUtil.getClient; import static org.apache.accumulo.core.rpc.ThriftUtil.returnClient; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.ConcurrentModificationException; @@ -138,9 +136,9 @@ public class InstanceOperationsImpl implements InstanceOperations { var log = LoggerFactory.getLogger(InstanceOperationsImpl.class); - Retry retry = - Retry.builder().infiniteRetries().retryAfter(25, MILLISECONDS).incrementBy(25, MILLISECONDS) - .maxWait(30, SECONDS).backOffFactor(1.5).logInterval(3, MINUTES).createRetry(); + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25)) + .incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(30)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); while (true) { try { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java index 223b40aa4f..3c92fa5fc7 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java @@ -20,13 +20,12 @@ package org.apache.accumulo.core.clientImpl; import static com.google.common.base.Preconditions.checkArgument; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.util.Validators.EXISTING_NAMESPACE_NAME; import static org.apache.accumulo.core.util.Validators.NEW_NAMESPACE_NAME; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.ConcurrentModificationException; @@ -245,9 +244,9 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { EXISTING_NAMESPACE_NAME.validate(namespace); checkArgument(mapMutator != null, "mapMutator is null"); - Retry retry = - Retry.builder().infiniteRetries().retryAfter(25, MILLISECONDS).incrementBy(25, MILLISECONDS) - .maxWait(30, SECONDS).backOffFactor(1.5).logInterval(3, MINUTES).createRetry(); + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25)) + .incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(30)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); while (true) { try { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index ccdbc6ea78..8435df2a40 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -23,7 +23,6 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; @@ -37,6 +36,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -1020,9 +1020,9 @@ public class TableOperationsImpl extends TableOperationsHelper { EXISTING_TABLE_NAME.validate(tableName); checkArgument(mapMutator != null, "mapMutator is null"); - Retry retry = - Retry.builder().infiniteRetries().retryAfter(25, MILLISECONDS).incrementBy(25, MILLISECONDS) - .maxWait(30, SECONDS).backOffFactor(1.5).logInterval(3, MINUTES).createRetry(); + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25)) + .incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(30)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); while (true) { try { @@ -1886,9 +1886,9 @@ public class TableOperationsImpl extends TableOperationsHelper { locator.invalidateCache(); - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); while (!locator.binRanges(context, rangeList, binnedRanges).isEmpty()) { context.requireTableExists(tableId, tableName); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index 94a75d0dbb..5e71087ce8 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@ -18,8 +18,6 @@ */ package org.apache.accumulo.core.clientImpl; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; @@ -251,9 +249,9 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value int lastFailureSize = Integer.MAX_VALUE; - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(10, SECONDS).backOffFactor(1.07) - .logInterval(1, MINUTES).createFactory().createRetry(); + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofMinutes(10)).backOffFactor(1.07) + .logInterval(Duration.ofMinutes(1)).createFactory().createRetry(); while (true) { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index f167b25323..c2543cb8ee 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -19,8 +19,6 @@ package org.apache.accumulo.core.clientImpl; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; -import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; @@ -29,6 +27,7 @@ import java.io.IOException; import java.lang.management.CompilationMXBean; import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -1093,9 +1092,9 @@ public class TabletServerBatchWriter implements AutoCloseable { private void cancelSession() throws InterruptedException, ThriftSecurityException { - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(60, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofMinutes(1)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); final HostAndPort parsedServer = HostAndPort.fromString(location); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index 9d4fc24a0e..63ba6cb682 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -19,8 +19,6 @@ package org.apache.accumulo.core.clientImpl.bulk; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.groupingBy; import static org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.pathToCacheId; import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME; @@ -28,6 +26,7 @@ import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -142,9 +141,9 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti if (propValue != null) { maxTablets = Integer.parseInt(propValue); } - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(2, MINUTES).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofMinutes(2)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); // retry if a merge occurs boolean shouldRetry = true; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReader.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReader.java index dbd80b210b..1df0b54bf3 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReader.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReader.java @@ -19,9 +19,8 @@ package org.apache.accumulo.core.fate.zookeeper; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; +import java.time.Duration; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -42,8 +41,9 @@ public class ZooReader { private static final Logger log = LoggerFactory.getLogger(ZooReader.class); protected static final RetryFactory RETRY_FACTORY = - Retry.builder().maxRetries(10).retryAfter(250, MILLISECONDS).incrementBy(250, MILLISECONDS) - .maxWait(2, MINUTES).backOffFactor(1.5).logInterval(3, MINUTES).createFactory(); + Retry.builder().maxRetries(10).retryAfter(Duration.ofMillis(250)) + .incrementBy(Duration.ofMillis(250)).maxWait(Duration.ofMinutes(2)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createFactory(); protected final String keepers; protected final int timeout; diff --git a/core/src/main/java/org/apache/accumulo/core/util/Retry.java b/core/src/main/java/org/apache/accumulo/core/util/Retry.java index 4704ddfa92..28659b376f 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/Retry.java +++ b/core/src/main/java/org/apache/accumulo/core/util/Retry.java @@ -18,11 +18,9 @@ */ package org.apache.accumulo.core.util; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import org.slf4j.Logger; @@ -35,13 +33,13 @@ import com.google.common.base.Preconditions; */ public class Retry { private long maxRetries; // not final for testing - private long waitIncrement; // not final for testing - private long maxWait; // not final for testing - private final long logIntervalNanoSec; + private Duration waitIncrement; // not final for testing + private Duration maxWait; // not final for testing + private final Duration logInterval; private double backOffFactor; private long retriesDone; - private long currentWait; - private long initialWait; + private Duration currentWait; + private Duration initialWait; private boolean hasNeverLogged; private boolean hasLoggedWarn = false; @@ -56,15 +54,15 @@ public class Retry { * @param waitIncrement The amount of time (ms) to increment next wait time by * @param logInterval The amount of time (ms) between logging retries */ - private Retry(long maxRetries, long startWait, long waitIncrement, long maxWait, long logInterval, - double backOffFactor) { + private Retry(long maxRetries, Duration startWait, Duration waitIncrement, Duration maxWait, + Duration logInterval, double backOffFactor) { this.maxRetries = maxRetries; this.maxWait = maxWait; this.waitIncrement = waitIncrement; this.retriesDone = 0; this.currentWait = startWait; this.initialWait = startWait; - this.logIntervalNanoSec = MILLISECONDS.toNanos(logInterval); + this.logInterval = logInterval; this.hasNeverLogged = true; this.lastRetryLog = -1; this.backOffFactor = backOffFactor; @@ -93,19 +91,19 @@ public class Retry { // Visible for testing @VisibleForTesting - long getCurrentWait() { + Duration getCurrentWait() { return currentWait; } // Visible for testing @VisibleForTesting - long getWaitIncrement() { + Duration getWaitIncrement() { return waitIncrement; } // Visible for testing @VisibleForTesting - long getMaxWait() { + Duration getMaxWait() { return maxWait; } @@ -117,20 +115,20 @@ public class Retry { // Visible for testing @VisibleForTesting - void setStartWait(long startWait) { + void setStartWait(Duration startWait) { this.currentWait = startWait; this.initialWait = startWait; } // Visible for testing @VisibleForTesting - void setWaitIncrement(long waitIncrement) { + void setWaitIncrement(Duration waitIncrement) { this.waitIncrement = waitIncrement; } // Visible for testing @VisibleForTesting - void setMaxWait(long maxWait) { + void setMaxWait(Duration maxWait) { this.maxWait = maxWait; } @@ -144,8 +142,8 @@ public class Retry { return maxRetries < 0; } - public long getLogInterval() { - return NANOSECONDS.toMillis(logIntervalNanoSec); + public Duration getLogInterval() { + return logInterval; } public boolean canRetry() { @@ -176,23 +174,29 @@ public class Retry { } currentBackOffFactor = currentBackOffFactor * backOffFactor; - log.debug("Sleeping for {}ms before retrying operation : {} ", currentWait, + log.debug("Sleeping for {}ms before retrying operation : {} ", currentWait.toMillis(), operationDescription); sleep(currentWait); if (backOffFactor == 1) { - currentWait = Math.min(maxWait, currentWait + waitIncrement); + currentWait = currentWait.plus(waitIncrement); + if (currentWait.compareTo(maxWait) > 0) { + currentWait = maxWait; + } } else if (backOffFactor > 1.0) { - if (currentWait < maxWait) { - waitIncrement = (long) Math.ceil(waitFactor * this.initialWait); - currentWait = Math.min(maxWait, initialWait + waitIncrement); + waitIncrement = Duration.ofMillis((long) Math.ceil(waitFactor * initialWait.toMillis())); + Duration tempWait = initialWait.plus(waitIncrement); + if (tempWait.compareTo(maxWait) > 0) { + currentWait = maxWait; + } else { + currentWait = tempWait; } } } - protected void sleep(long wait) throws InterruptedException { - Thread.sleep(wait); + protected void sleep(Duration wait) throws InterruptedException { + Thread.sleep(wait.toMillis()); } public void logRetry(Logger log, String message, Throwable t) { @@ -204,7 +208,7 @@ public class Retry { } hasNeverLogged = false; lastRetryLog = now; - } else if ((now - lastRetryLog) > logIntervalNanoSec) { + } else if ((now - lastRetryLog) > logInterval.toNanos()) { log.warn(getMessage(message), t); lastRetryLog = now; hasLoggedWarn = true; @@ -224,7 +228,7 @@ public class Retry { } hasNeverLogged = false; lastRetryLog = now; - } else if ((now - lastRetryLog) > logIntervalNanoSec) { + } else if ((now - lastRetryLog) > logInterval.toNanos()) { log.warn(getMessage(message)); lastRetryLog = now; hasLoggedWarn = true; @@ -272,29 +276,27 @@ public class Retry { public interface NeedsRetryDelay { /** - * @param duration the amount of time to wait before the first retry; input is converted to - * milliseconds, rounded down to the nearest + * @param duration the amount of time to wait before the first retry * @return this builder with the initial wait period set */ - NeedsTimeIncrement retryAfter(long duration, TimeUnit unit); + NeedsTimeIncrement retryAfter(Duration duration); } public interface NeedsTimeIncrement { /** - * @param duration the amount of additional time to add before each subsequent retry; input is - * converted to milliseconds, rounded down to the nearest + * @param duration the amount of additional time to add before each subsequent retry * @return this builder with the increment amount set */ - NeedsMaxWait incrementBy(long duration, TimeUnit unit); + NeedsMaxWait incrementBy(Duration duration); } public interface NeedsMaxWait { /** * @param duration the maximum amount of time to which the waiting period between retries can be - * incremented; input is converted to milliseconds, rounded down to the nearest + * incremented * @return this builder with a maximum time limit set */ - NeedsBackOffFactor maxWait(long duration, TimeUnit unit); + NeedsBackOffFactor maxWait(Duration duration); } public interface NeedsBackOffFactor { @@ -308,11 +310,10 @@ public class Retry { public interface NeedsLogInterval { /** - * @param duration the minimum time interval between logging that a retry is occurring; input is - * converted to milliseconds, rounded down to the nearest + * @param duration the minimum time interval between logging that a retry is occurring * @return this builder with a logging interval set */ - BuilderDone logInterval(long duration, TimeUnit unit); + BuilderDone logInterval(Duration duration); } public interface BuilderDone { @@ -352,10 +353,10 @@ public class Retry { private boolean modifiable = true; private long maxRetries; - private long initialWait; - private long maxWait; - private long waitIncrement; - private long logInterval; + private Duration initialWait; + private Duration maxWait; + private Duration waitIncrement; + private Duration logInterval; private double backOffFactor = 1.5; RetryFactoryBuilder() {} @@ -381,19 +382,20 @@ public class Retry { } @Override - public NeedsTimeIncrement retryAfter(long duration, TimeUnit unit) { + public NeedsTimeIncrement retryAfter(Duration duration) { checkState(); - Preconditions.checkArgument(duration >= 0, "Initial waiting period must not be negative"); - this.initialWait = unit.toMillis(duration); + Preconditions.checkArgument(!duration.isNegative(), + "Initial waiting period must not be negative"); + this.initialWait = duration; return this; } @Override - public NeedsMaxWait incrementBy(long duration, TimeUnit unit) { + public NeedsMaxWait incrementBy(Duration duration) { checkState(); - Preconditions.checkArgument(duration >= 0, + Preconditions.checkArgument(!duration.isNegative(), "Amount of time to increment the wait between each retry must not be negative"); - this.waitIncrement = unit.toMillis(duration); + this.waitIncrement = duration; return this; } @@ -407,20 +409,20 @@ public class Retry { } @Override - public NeedsBackOffFactor maxWait(long duration, TimeUnit unit) { + public NeedsBackOffFactor maxWait(Duration duration) { checkState(); - this.maxWait = unit.toMillis(duration); - Preconditions.checkArgument(maxWait >= initialWait, + this.maxWait = duration; + Preconditions.checkArgument(maxWait.compareTo(initialWait) >= 0, "Maximum wait between retries must not be less than the initial delay"); return this; } @Override - public BuilderDone logInterval(long duration, TimeUnit unit) { + public BuilderDone logInterval(Duration duration) { checkState(); - Preconditions.checkArgument(duration >= 0, + Preconditions.checkArgument(!duration.isNegative(), "The amount of time between logging retries must not be negative"); - this.logInterval = unit.toMillis(duration); + this.logInterval = duration; return this; } diff --git a/core/src/test/java/org/apache/accumulo/core/util/RetryTest.java b/core/src/test/java/org/apache/accumulo/core/util/RetryTest.java index ce1933324b..45992ec9b5 100644 --- a/core/src/test/java/org/apache/accumulo/core/util/RetryTest.java +++ b/core/src/test/java/org/apache/accumulo/core/util/RetryTest.java @@ -18,17 +18,12 @@ */ package org.apache.accumulo.core.util; -import static java.util.concurrent.TimeUnit.DAYS; -import static java.util.concurrent.TimeUnit.HOURS; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; -import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import org.apache.accumulo.core.util.Retry.NeedsLogInterval; import org.apache.accumulo.core.util.Retry.NeedsMaxWait; @@ -45,24 +40,23 @@ import org.slf4j.LoggerFactory; public class RetryTest { private Retry retry; - private static final long INITIAL_WAIT = 1000; - private static final long WAIT_INC = 1000; + private static final Duration INITIAL_WAIT = Duration.ofSeconds(1); + private static final Duration WAIT_INC = Duration.ofSeconds(1); private static final double BACKOFF_FACTOR = 1.0; private static final long MAX_RETRIES = 5; - private static final long LOG_INTERVAL = 1000; + private static final Duration LOG_INTERVAL = Duration.ofSeconds(1); private Retry unlimitedRetry; - private static final TimeUnit MS = MILLISECONDS; private static final Logger log = LoggerFactory.getLogger(RetryTest.class); @BeforeEach public void setup() { - retry = Retry.builder().maxRetries(MAX_RETRIES).retryAfter(INITIAL_WAIT, MS) - .incrementBy(WAIT_INC, MS).maxWait(MAX_RETRIES * WAIT_INC, MS).backOffFactor(BACKOFF_FACTOR) - .logInterval(LOG_INTERVAL, MS).createRetry(); - unlimitedRetry = Retry.builder().infiniteRetries().retryAfter(INITIAL_WAIT, MS) - .incrementBy(WAIT_INC, MS).maxWait(MAX_RETRIES * WAIT_INC, MS).backOffFactor(BACKOFF_FACTOR) - .logInterval(LOG_INTERVAL, MS).createRetry(); + retry = Retry.builder().maxRetries(MAX_RETRIES).retryAfter(INITIAL_WAIT).incrementBy(WAIT_INC) + .maxWait(WAIT_INC.multipliedBy(MAX_RETRIES)).backOffFactor(BACKOFF_FACTOR) + .logInterval(LOG_INTERVAL).createRetry(); + unlimitedRetry = Retry.builder().infiniteRetries().retryAfter(INITIAL_WAIT) + .incrementBy(WAIT_INC).maxWait(WAIT_INC.multipliedBy(MAX_RETRIES)) + .backOffFactor(BACKOFF_FACTOR).logInterval(LOG_INTERVAL).createRetry(); } @Test @@ -109,15 +103,15 @@ public class RetryTest { retry.setMaxRetries(MAX_RETRIES); retry.setStartWait(INITIAL_WAIT); retry.setWaitIncrement(WAIT_INC); - retry.setMaxWait(MAX_RETRIES * 1000); + retry.setMaxWait(Duration.ofSeconds(1).multipliedBy(MAX_RETRIES)); retry.setBackOffFactor(1); retry.setDoTimeJitter(false); - long currentWait = INITIAL_WAIT; + Duration currentWait = INITIAL_WAIT; for (int i = 1; i <= MAX_RETRIES; i++) { retry.sleep(currentWait); EasyMock.expectLastCall(); - currentWait += WAIT_INC; + currentWait = currentWait.plus(WAIT_INC); } EasyMock.replay(retry); @@ -136,9 +130,9 @@ public class RetryTest { retry.setMaxRetries(MAX_RETRIES); retry.setBackOffFactor(1.5); retry.setStartWait(INITIAL_WAIT); - long waitIncrement = 0, currentWait = INITIAL_WAIT; + Duration waitIncrement, currentWait = INITIAL_WAIT; retry.setWaitIncrement(WAIT_INC); - retry.setMaxWait(MAX_RETRIES * 128000); + retry.setMaxWait(Duration.ofSeconds(128).multipliedBy(MAX_RETRIES)); retry.setDoTimeJitter(false); double backOfFactor = 1.5, originalBackoff = 1.5; @@ -146,8 +140,13 @@ public class RetryTest { retry.sleep(currentWait); double waitFactor = backOfFactor; backOfFactor *= originalBackoff; - waitIncrement = (long) (Math.ceil(waitFactor * WAIT_INC)); - currentWait = Math.min(retry.getMaxWait(), INITIAL_WAIT + waitIncrement); + waitIncrement = Duration.ofMillis((long) (Math.ceil(waitFactor * WAIT_INC.toMillis()))); + Duration tempWait = INITIAL_WAIT.plus(waitIncrement); + if (tempWait.compareTo(retry.getMaxWait()) > 0) { + currentWait = retry.getMaxWait(); + } else { + currentWait = tempWait; + } EasyMock.expectLastCall(); } @@ -168,16 +167,16 @@ public class RetryTest { retry.setStartWait(INITIAL_WAIT); retry.setWaitIncrement(WAIT_INC); // Make the last retry not increment in length - retry.setMaxWait((MAX_RETRIES - 1) * 1000); + retry.setMaxWait(Duration.ofSeconds(MAX_RETRIES - 1)); retry.setBackOffFactor(1); retry.setDoTimeJitter(false); - long currentWait = INITIAL_WAIT; + Duration currentWait = INITIAL_WAIT; for (int i = 1; i <= MAX_RETRIES; i++) { retry.sleep(currentWait); EasyMock.expectLastCall(); if (i < MAX_RETRIES - 1) { - currentWait += WAIT_INC; + currentWait = currentWait.plus(WAIT_INC); } } @@ -247,94 +246,102 @@ public class RetryTest { @Test public void testInitialWait() { NeedsRetryDelay builder = Retry.builder().maxRetries(10); - builder.retryAfter(10, NANOSECONDS); - builder.retryAfter(10, MILLISECONDS); - builder.retryAfter(10, DAYS); - builder.retryAfter(0, NANOSECONDS); - builder.retryAfter(0, MILLISECONDS); - builder.retryAfter(0, DAYS); - - assertThrows(IllegalArgumentException.class, () -> builder.retryAfter(-1, NANOSECONDS), + builder.retryAfter(Duration.ofNanos(10)); + builder.retryAfter(Duration.ofMillis(10)); + builder.retryAfter(Duration.ofDays(10)); + builder.retryAfter(Duration.ofNanos(0)); + builder.retryAfter(Duration.ofMillis(0)); + builder.retryAfter(Duration.ofDays(0)); + + assertThrows(IllegalArgumentException.class, () -> builder.retryAfter(Duration.ofNanos(-1)), "Should not allow negative wait times"); } @Test public void testIncrementBy() { - NeedsTimeIncrement builder = Retry.builder().maxRetries(10).retryAfter(10, MILLISECONDS); - builder.incrementBy(10, DAYS); - builder.incrementBy(10, HOURS); - builder.incrementBy(10, NANOSECONDS); - builder.incrementBy(0, DAYS); - builder.incrementBy(0, HOURS); - builder.incrementBy(0, NANOSECONDS); - - assertThrows(IllegalArgumentException.class, () -> builder.incrementBy(-1, NANOSECONDS), + NeedsTimeIncrement builder = Retry.builder().maxRetries(10).retryAfter(Duration.ofMillis(10)); + builder.incrementBy(Duration.ofDays(10)); + builder.incrementBy(Duration.ofHours(10)); + builder.incrementBy(Duration.ofNanos(10)); + builder.incrementBy(Duration.ofDays(0)); + builder.incrementBy(Duration.ofHours(0)); + builder.incrementBy(Duration.ofNanos(0)); + + assertThrows(IllegalArgumentException.class, () -> builder.incrementBy(Duration.ofNanos(-1)), "Should not allow negative increments"); } @Test public void testMaxWait() { - NeedsMaxWait builder = - Retry.builder().maxRetries(10).retryAfter(15, MILLISECONDS).incrementBy(10, MILLISECONDS); - builder.maxWait(15, MILLISECONDS); - builder.maxWait(16, MILLISECONDS); + NeedsMaxWait builder = Retry.builder().maxRetries(10).retryAfter(Duration.ofMillis(15)) + .incrementBy(Duration.ofMillis(10)); + builder.maxWait(Duration.ofMillis(15)); + builder.maxWait(Duration.ofMillis(16)); - assertThrows(IllegalArgumentException.class, () -> builder.maxWait(14, MILLISECONDS), + assertThrows(IllegalArgumentException.class, () -> builder.maxWait(Duration.ofMillis(14)), "Max wait time should be greater than or equal to initial wait time"); } @Test public void testLogInterval() { - NeedsLogInterval builder = Retry.builder().maxRetries(10).retryAfter(15, MILLISECONDS) - .incrementBy(10, MILLISECONDS).maxWait(16, MINUTES).backOffFactor(1); - builder.logInterval(10, DAYS); - builder.logInterval(10, HOURS); - builder.logInterval(10, NANOSECONDS); - builder.logInterval(0, DAYS); - builder.logInterval(0, HOURS); - builder.logInterval(0, NANOSECONDS); - - assertThrows(IllegalArgumentException.class, () -> builder.logInterval(-1, NANOSECONDS), + NeedsLogInterval builder = Retry.builder().maxRetries(10).retryAfter(Duration.ofMillis(15)) + .incrementBy(Duration.ofMillis(10)).maxWait(Duration.ofMinutes(16)).backOffFactor(1); + builder.logInterval(Duration.ofDays(10)); + builder.logInterval(Duration.ofHours(10)); + builder.logInterval(Duration.ofNanos(10)); + builder.logInterval(Duration.ofDays(0)); + builder.logInterval(Duration.ofHours(0)); + builder.logInterval(Duration.ofNanos(0)); + + assertThrows(IllegalArgumentException.class, () -> builder.logInterval(Duration.ofNanos(-1)), "Log interval must not be negative"); } @Test public void properArgumentsInRetry() { - long maxRetries = 10, startWait = 50L, maxWait = 5000L, waitIncrement = 500L, - logInterval = 10000L; - RetryFactory factory = Retry.builder().maxRetries(maxRetries).retryAfter(startWait, MS) - .incrementBy(waitIncrement, MS).maxWait(maxWait, MS).backOffFactor(1) - .logInterval(logInterval, MS).createFactory(); + long maxRetries = 10; + Duration startWait = Duration.ofMillis(50); + Duration maxWait = Duration.ofMillis(5000); + Duration waitIncrement = Duration.ofMillis(500); + Duration logInterval = Duration.ofMillis(10000); + + RetryFactory factory = + Retry.builder().maxRetries(maxRetries).retryAfter(startWait).incrementBy(waitIncrement) + .maxWait(maxWait).backOffFactor(1).logInterval(logInterval).createFactory(); Retry retry = factory.createRetry(); assertEquals(maxRetries, retry.getMaxRetries()); - assertEquals(startWait, retry.getCurrentWait()); - assertEquals(maxWait, retry.getMaxWait()); - assertEquals(waitIncrement, retry.getWaitIncrement()); - assertEquals(logInterval, retry.getLogInterval()); + assertEquals(startWait.toMillis(), retry.getCurrentWait().toMillis()); + assertEquals(maxWait.toMillis(), retry.getMaxWait().toMillis()); + assertEquals(waitIncrement.toMillis(), retry.getWaitIncrement().toMillis()); + assertEquals(logInterval.toMillis(), retry.getLogInterval().toMillis()); } @Test public void properArgumentsInUnlimitedRetry() { - long startWait = 50L, maxWait = 5000L, waitIncrement = 500L, logInterval = 10000L; + Duration startWait = Duration.ofMillis(50); + Duration maxWait = Duration.ofSeconds(5); + Duration waitIncrement = Duration.ofMillis(500); + Duration logInterval = Duration.ofSeconds(10); double waitFactor = 1.0; - RetryFactory factory = Retry.builder().infiniteRetries().retryAfter(startWait, MS) - .incrementBy(waitIncrement, MS).maxWait(maxWait, MS).backOffFactor(waitFactor) - .logInterval(logInterval, MS).createFactory(); + + RetryFactory factory = + Retry.builder().infiniteRetries().retryAfter(startWait).incrementBy(waitIncrement) + .maxWait(maxWait).backOffFactor(waitFactor).logInterval(logInterval).createFactory(); Retry retry = factory.createRetry(); assertEquals(-1, retry.getMaxRetries()); - assertEquals(startWait, retry.getCurrentWait()); - assertEquals(maxWait, retry.getMaxWait()); - assertEquals(waitIncrement, retry.getWaitIncrement()); - assertEquals(logInterval, retry.getLogInterval()); + assertEquals(startWait.toMillis(), retry.getCurrentWait().toMillis()); + assertEquals(maxWait.toMillis(), retry.getMaxWait().toMillis()); + assertEquals(waitIncrement.toMillis(), retry.getWaitIncrement().toMillis()); + assertEquals(logInterval.toMillis(), retry.getLogInterval().toMillis()); } @Test public void testInfiniteRetryWithBackoff() throws InterruptedException { - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(500, MILLISECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofMillis(500)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); for (int i = 0; i < 100; i++) { try { retry.waitForNextAttempt(log, i + ""); diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java index 020e436af6..2d5d83afa3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java @@ -18,9 +18,9 @@ */ package org.apache.accumulo.server.compaction; -import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.time.Duration.ofMillis; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.Retry.NeedsRetryDelay; @@ -78,8 +78,8 @@ public class RetryableThriftCall<T> { } else { builder = Retry.builder().maxRetries(maxNumRetries); } - this.retry = builder.retryAfter(start, MILLISECONDS).incrementBy(0, MILLISECONDS) - .maxWait(maxWaitTime, MILLISECONDS).backOffFactor(2).logInterval(1, TimeUnit.MINUTES) + this.retry = builder.retryAfter(Duration.ofMillis(start)).incrementBy(Duration.ZERO) + .maxWait(Duration.ofMillis(maxWaitTime)).backOffFactor(2).logInterval(Duration.ofMinutes(1)) .createRetry(); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index bac4f13be5..d8c2b12830 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -30,6 +30,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -1345,9 +1346,10 @@ public class Manager extends AbstractServer waitIncrement = 5; } - Retry tserverRetry = Retry.builder().maxRetries(retries).retryAfter(initialWait, SECONDS) - .incrementBy(waitIncrement, SECONDS).maxWait(maxWaitPeriod, SECONDS).backOffFactor(1) - .logInterval(30, SECONDS).createRetry(); + Retry tserverRetry = Retry.builder().maxRetries(retries) + .retryAfter(Duration.ofSeconds(initialWait)).incrementBy(Duration.ofSeconds(waitIncrement)) + .maxWait(Duration.ofSeconds(maxWaitPeriod)).backOffFactor(1) + .logInterval(Duration.ofSeconds(30)).createRetry(); log.info("Checking for tserver availability - need to reach {} servers. Have {}", minTserverCount, tserverSet.size()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 12227f2576..b9c9a39359 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -322,17 +322,17 @@ public class TabletServer extends AbstractServer implements TabletHostingServer aconf.getTimeInMillis(Property.TSERV_WAL_TOLERATED_MAXIMUM_WAIT_DURATION); final RetryFactory walCreationRetryFactory = Retry.builder().maxRetries(toleratedWalCreationFailures) - .retryAfter(walFailureRetryIncrement, TimeUnit.MILLISECONDS) - .incrementBy(walFailureRetryIncrement, TimeUnit.MILLISECONDS) - .maxWait(walFailureRetryMax, TimeUnit.MILLISECONDS).backOffFactor(1.5) - .logInterval(3, TimeUnit.MINUTES).createFactory(); + .retryAfter(Duration.ofMillis(walFailureRetryIncrement)) + .incrementBy(Duration.ofMillis(walFailureRetryIncrement)) + .maxWait(Duration.ofMillis(walFailureRetryMax)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createFactory(); // Tolerate infinite failures for the write, however backing off the same as for creation // failures. - final RetryFactory walWritingRetryFactory = Retry.builder().infiniteRetries() - .retryAfter(walFailureRetryIncrement, TimeUnit.MILLISECONDS) - .incrementBy(walFailureRetryIncrement, TimeUnit.MILLISECONDS) - .maxWait(walFailureRetryMax, TimeUnit.MILLISECONDS).backOffFactor(1.5) - .logInterval(3, TimeUnit.MINUTES).createFactory(); + final RetryFactory walWritingRetryFactory = + Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(walFailureRetryIncrement)) + .incrementBy(Duration.ofMillis(walFailureRetryIncrement)) + .maxWait(Duration.ofMillis(walFailureRetryMax)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createFactory(); logger = new TabletServerLogger(this, walMaxSize, syncCounter, flushCounter, walCreationRetryFactory, walWritingRetryFactory, walMaxAge); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index 719b8c34da..cd8832996b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; +import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -100,9 +101,9 @@ public class CompactionManager { long increment = Math.max(1, maxTimeBetweenChecks / 10); - var retryFactory = Retry.builder().infiniteRetries().retryAfter(increment, MILLISECONDS) - .incrementBy(increment, MILLISECONDS).maxWait(maxTimeBetweenChecks, MILLISECONDS) - .backOffFactor(1.07).logInterval(1, MINUTES).createFactory(); + var retryFactory = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(increment)) + .incrementBy(Duration.ofMillis(increment)).maxWait(Duration.ofMillis(maxTimeBetweenChecks)) + .backOffFactor(1.07).logInterval(Duration.ofMinutes(1)).createFactory(); var retry = retryFactory.createRetry(); Compactable last = null; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index 7ff5b70e35..caed9081d4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -19,10 +19,9 @@ package org.apache.accumulo.tserver.session; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -243,9 +242,9 @@ public class SessionManager { static void cleanup(BlockingQueue<Session> deferredCleanupQueue, Session session) { if (!session.cleanup()) { - var retry = Retry.builder().infiniteRetries().retryAfter(25, MILLISECONDS) - .incrementBy(25, MILLISECONDS).maxWait(5, SECONDS).backOffFactor(1.5) - .logInterval(1, MINUTES).createRetry(); + var retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25)) + .incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(5)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(1)).createRetry(); while (!deferredCleanupQueue.offer(session)) { if (session.cleanup()) {