This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch 1.10
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.10 by this push:
     new e9842290a5 Fix wait timeout logic for available tservers (#3231)
e9842290a5 is described below

commit e9842290a50233262604e38cd41570221185b6bc
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Fri Mar 10 13:27:26 2023 -0500

    Fix wait timeout logic for available tservers (#3231)
    
    * Fix wait timeout logic for available tservers
    * Use static imports for TimeUnit durations
    
    Closes https://github.com/apache/accumulo/issues/3159
---
 .../java/org/apache/accumulo/master/Master.java    | 55 ++++++++++------------
 1 file changed, 25 insertions(+), 30 deletions(-)

diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java 
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 51c1b87740..9617032d82 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -17,6 +17,8 @@
 package org.apache.accumulo.master;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.io.IOException;
@@ -36,7 +38,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -788,7 +789,7 @@ public class Master extends AccumuloServerContext
         return MasterGoalState.valueOf(new String(data));
       } catch (Exception e) {
         log.error("Problem getting real goal state from zookeeper: " + e);
-        sleepUninterruptibly(1, TimeUnit.SECONDS);
+        sleepUninterruptibly(1_000, MILLISECONDS);
       }
   }
 
@@ -939,7 +940,7 @@ public class Master extends AccumuloServerContext
             log.error("Error cleaning up migrations", ex);
           }
         }
-        sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, 
TimeUnit.MILLISECONDS);
+        sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, MILLISECONDS);
       }
     }
 
@@ -1084,7 +1085,7 @@ public class Master extends AccumuloServerContext
         } catch (Throwable t) {
           log.error("Error balancing tablets, will wait for " + 
WAIT_BETWEEN_ERRORS / ONE_SECOND
               + " (seconds) and then retry", t);
-          sleepUninterruptibly(WAIT_BETWEEN_ERRORS, TimeUnit.MILLISECONDS);
+          sleepUninterruptibly(WAIT_BETWEEN_ERRORS, MILLISECONDS);
         }
       }
     }
@@ -1186,7 +1187,7 @@ public class Master extends AccumuloServerContext
         // Since an unbounded thread pool is being used, rate limit how fast 
task are added to the
         // executor. This prevents the threads from growing large unless there 
are lots of
         // unresponsive tservers.
-        sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), 
TimeUnit.MILLISECONDS);
+        sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), MILLISECONDS);
       }
       tp.submit(new Runnable() {
         @Override
@@ -1235,7 +1236,7 @@ public class Master extends AccumuloServerContext
     }
     tp.shutdown();
     try {
-      tp.awaitTermination(Math.max(10000, rpcTimeout / 3), 
TimeUnit.MILLISECONDS);
+      tp.awaitTermination(Math.max(10000, rpcTimeout / 3), MILLISECONDS);
     } catch (InterruptedException e) {
       log.debug("Interrupted while fetching status");
     }
@@ -1363,7 +1364,7 @@ public class Master extends AccumuloServerContext
           log.info("Waiting for AuthenticationTokenKeyManager to be 
initialized");
           logged = true;
         }
-        sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        sleepUninterruptibly(200, MILLISECONDS);
       }
       // And log when we are initialized
       log.info("AuthenticationTokenSecretManager is initialized");
@@ -1388,7 +1389,7 @@ public class Master extends AccumuloServerContext
     masterLock.replaceLockData(address.getBytes());
 
     while (!clientService.isServing()) {
-      sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      sleepUninterruptibly(100, MILLISECONDS);
     }
 
     // Start the daemon to scan the replication table and make units of work
@@ -1434,7 +1435,7 @@ public class Master extends AccumuloServerContext
     }
 
     while (clientService.isServing()) {
-      sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+      sleepUninterruptibly(500, MILLISECONDS);
     }
     log.info("Shutting down fate.");
     fate.shutdown();
@@ -1478,8 +1479,7 @@ public class Master extends AccumuloServerContext
    *           if interrupted while blocking, propagated for caller to handle.
    */
   private void blockForTservers() throws InterruptedException {
-
-    long waitStart = System.currentTimeMillis();
+    long waitStart = System.nanoTime();
 
     AccumuloConfiguration accConfig = serverConfig.getConfiguration();
     long minTserverCount = 
accConfig.getCount(Property.MASTER_STARTUP_TSERVER_AVAIL_MIN_COUNT);
@@ -1490,7 +1490,6 @@ public class Master extends AccumuloServerContext
           tserverSet.size(), 
Property.MASTER_STARTUP_TSERVER_AVAIL_MIN_COUNT.getKey());
       return;
     }
-
     long maxWait = 
accConfig.getTimeInMillis(Property.MASTER_STARTUP_TSERVER_AVAIL_MAX_WAIT);
 
     if (maxWait <= 0) {
@@ -1499,14 +1498,12 @@ public class Master extends AccumuloServerContext
       maxWait = Long.MAX_VALUE;
     }
 
-    // honor Retry condition that initial wait < max wait, otherwise use small 
value to allow thread
-    // yield to happen
-    long initialWait = Math.min(50, maxWait / 2);
+    long retries = 10;
+    long waitPeriod = maxWait / retries;
 
-    Retry tserverRetry =
-        Retry.builder().infiniteRetries().retryAfter(initialWait, 
TimeUnit.MILLISECONDS)
-            .incrementBy(15_000, TimeUnit.MILLISECONDS).maxWait(maxWait, 
TimeUnit.MILLISECONDS)
-            .logInterval(30_000, TimeUnit.MILLISECONDS).createRetry();
+    Retry tserverRetry = 
Retry.builder().maxRetries(retries).retryAfter(waitPeriod, MILLISECONDS)
+        .incrementBy(0, MILLISECONDS).maxWait(waitPeriod, MILLISECONDS)
+        .logInterval(30_000, MILLISECONDS).createRetry();
 
     log.info("Checking for tserver availability - need to reach {} servers. 
Have {}",
         minTserverCount, tserverSet.size());
@@ -1521,27 +1518,25 @@ public class Master extends AccumuloServerContext
 
       // suppress last message once threshold reached.
       if (needTservers) {
-        log.info(
-            "Blocking for tserver availability - need to reach {} servers. 
Have {}"
-                + " Time spent blocking {} sec.",
+        tserverRetry.logRetry(log, String.format(
+            "Blocking for tserver availability - need to reach %s servers. 
Have %s Time spent blocking %s seconds.",
             minTserverCount, tserverSet.size(),
-            TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
waitStart));
+            NANOSECONDS.toSeconds(System.nanoTime() - waitStart)));
       }
+      tserverRetry.useRetry();
     }
 
     if (tserverSet.size() < minTserverCount) {
       log.warn(
           "tserver availability check time expired - continuing. Requested {}, 
have {} tservers on line. "
-              + " Time waiting {} ms",
-          tserverSet.size(), minTserverCount,
-          TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
waitStart));
+              + " Time waiting {} sec",
+          tserverSet.size(), minTserverCount, 
NANOSECONDS.toSeconds(System.nanoTime() - waitStart));
 
     } else {
       log.info(
           "tserver availability check completed. Requested {}, have {} 
tservers on line. "
-              + " Time waiting {} ms",
-          tserverSet.size(), minTserverCount,
-          TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
waitStart));
+              + " Time waiting {} sec",
+          tserverSet.size(), minTserverCount, 
NANOSECONDS.toSeconds(System.nanoTime() - waitStart));
     }
   }
 
@@ -1639,7 +1634,7 @@ public class Master extends AccumuloServerContext
 
       masterLock.tryToCancelAsyncLockOrUnlock();
 
-      sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, 
TimeUnit.MILLISECONDS);
+      sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, MILLISECONDS);
     }
 
     setMasterState(MasterState.HAVE_LOCK);

Reply via email to