This is an automated email from the ASF dual-hosted git repository. edcoleman pushed a commit to branch revert-3231-bugfix/change-wait-logic in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 783f0794f5215114ff3666378a3a46ff57ddbc1f Author: EdColeman <d...@etcoleman.com> AuthorDate: Fri Mar 10 19:22:04 2023 +0000 Revert "Fix wait timeout logic for available tservers (#3231)" This reverts commit e9842290a50233262604e38cd41570221185b6bc. --- .../java/org/apache/accumulo/master/Master.java | 55 ++++++++++++---------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 9617032d82..51c1b87740 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -17,8 +17,6 @@ package org.apache.accumulo.master; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.io.IOException; @@ -38,6 +36,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -789,7 +788,7 @@ public class Master extends AccumuloServerContext return MasterGoalState.valueOf(new String(data)); } catch (Exception e) { log.error("Problem getting real goal state from zookeeper: " + e); - sleepUninterruptibly(1_000, MILLISECONDS); + sleepUninterruptibly(1, TimeUnit.SECONDS); } } @@ -940,7 +939,7 @@ public class Master extends AccumuloServerContext log.error("Error cleaning up migrations", ex); } } - sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, MILLISECONDS); + sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, TimeUnit.MILLISECONDS); } } @@ -1085,7 +1084,7 @@ public class Master extends AccumuloServerContext } catch (Throwable t) { log.error("Error balancing tablets, will wait for " + WAIT_BETWEEN_ERRORS / ONE_SECOND + " (seconds) and then retry", t); - sleepUninterruptibly(WAIT_BETWEEN_ERRORS, MILLISECONDS); + sleepUninterruptibly(WAIT_BETWEEN_ERRORS, TimeUnit.MILLISECONDS); } } } @@ -1187,7 +1186,7 @@ public class Master extends AccumuloServerContext // Since an unbounded thread pool is being used, rate limit how fast task are added to the // executor. This prevents the threads from growing large unless there are lots of // unresponsive tservers. - sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), MILLISECONDS); + sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), TimeUnit.MILLISECONDS); } tp.submit(new Runnable() { @Override @@ -1236,7 +1235,7 @@ public class Master extends AccumuloServerContext } tp.shutdown(); try { - tp.awaitTermination(Math.max(10000, rpcTimeout / 3), MILLISECONDS); + tp.awaitTermination(Math.max(10000, rpcTimeout / 3), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.debug("Interrupted while fetching status"); } @@ -1364,7 +1363,7 @@ public class Master extends AccumuloServerContext log.info("Waiting for AuthenticationTokenKeyManager to be initialized"); logged = true; } - sleepUninterruptibly(200, MILLISECONDS); + sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } // And log when we are initialized log.info("AuthenticationTokenSecretManager is initialized"); @@ -1389,7 +1388,7 @@ public class Master extends AccumuloServerContext masterLock.replaceLockData(address.getBytes()); while (!clientService.isServing()) { - sleepUninterruptibly(100, MILLISECONDS); + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } // Start the daemon to scan the replication table and make units of work @@ -1435,7 +1434,7 @@ public class Master extends AccumuloServerContext } while (clientService.isServing()) { - sleepUninterruptibly(500, MILLISECONDS); + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); } log.info("Shutting down fate."); fate.shutdown(); @@ -1479,7 +1478,8 @@ public class Master extends AccumuloServerContext * if interrupted while blocking, propagated for caller to handle. */ private void blockForTservers() throws InterruptedException { - long waitStart = System.nanoTime(); + + long waitStart = System.currentTimeMillis(); AccumuloConfiguration accConfig = serverConfig.getConfiguration(); long minTserverCount = accConfig.getCount(Property.MASTER_STARTUP_TSERVER_AVAIL_MIN_COUNT); @@ -1490,6 +1490,7 @@ public class Master extends AccumuloServerContext tserverSet.size(), Property.MASTER_STARTUP_TSERVER_AVAIL_MIN_COUNT.getKey()); return; } + long maxWait = accConfig.getTimeInMillis(Property.MASTER_STARTUP_TSERVER_AVAIL_MAX_WAIT); if (maxWait <= 0) { @@ -1498,12 +1499,14 @@ public class Master extends AccumuloServerContext maxWait = Long.MAX_VALUE; } - long retries = 10; - long waitPeriod = maxWait / retries; + // honor Retry condition that initial wait < max wait, otherwise use small value to allow thread + // yield to happen + long initialWait = Math.min(50, maxWait / 2); - Retry tserverRetry = Retry.builder().maxRetries(retries).retryAfter(waitPeriod, MILLISECONDS) - .incrementBy(0, MILLISECONDS).maxWait(waitPeriod, MILLISECONDS) - .logInterval(30_000, MILLISECONDS).createRetry(); + Retry tserverRetry = + Retry.builder().infiniteRetries().retryAfter(initialWait, TimeUnit.MILLISECONDS) + .incrementBy(15_000, TimeUnit.MILLISECONDS).maxWait(maxWait, TimeUnit.MILLISECONDS) + .logInterval(30_000, TimeUnit.MILLISECONDS).createRetry(); log.info("Checking for tserver availability - need to reach {} servers. Have {}", minTserverCount, tserverSet.size()); @@ -1518,25 +1521,27 @@ public class Master extends AccumuloServerContext // suppress last message once threshold reached. if (needTservers) { - tserverRetry.logRetry(log, String.format( - "Blocking for tserver availability - need to reach %s servers. Have %s Time spent blocking %s seconds.", + log.info( + "Blocking for tserver availability - need to reach {} servers. Have {}" + + " Time spent blocking {} sec.", minTserverCount, tserverSet.size(), - NANOSECONDS.toSeconds(System.nanoTime() - waitStart))); + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - waitStart)); } - tserverRetry.useRetry(); } if (tserverSet.size() < minTserverCount) { log.warn( "tserver availability check time expired - continuing. Requested {}, have {} tservers on line. " - + " Time waiting {} sec", - tserverSet.size(), minTserverCount, NANOSECONDS.toSeconds(System.nanoTime() - waitStart)); + + " Time waiting {} ms", + tserverSet.size(), minTserverCount, + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - waitStart)); } else { log.info( "tserver availability check completed. Requested {}, have {} tservers on line. " - + " Time waiting {} sec", - tserverSet.size(), minTserverCount, NANOSECONDS.toSeconds(System.nanoTime() - waitStart)); + + " Time waiting {} ms", + tserverSet.size(), minTserverCount, + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - waitStart)); } } @@ -1634,7 +1639,7 @@ public class Master extends AccumuloServerContext masterLock.tryToCancelAsyncLockOrUnlock(); - sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, MILLISECONDS); + sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, TimeUnit.MILLISECONDS); } setMasterState(MasterState.HAVE_LOCK);