This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch revert-3231-bugfix/change-wait-logic
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 783f0794f5215114ff3666378a3a46ff57ddbc1f
Author: EdColeman <d...@etcoleman.com>
AuthorDate: Fri Mar 10 19:22:04 2023 +0000

    Revert "Fix wait timeout logic for available tservers (#3231)"
    
    This reverts commit e9842290a50233262604e38cd41570221185b6bc.
---
 .../java/org/apache/accumulo/master/Master.java    | 55 ++++++++++++----------
 1 file changed, 30 insertions(+), 25 deletions(-)

diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java 
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 9617032d82..51c1b87740 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -17,8 +17,6 @@
 package org.apache.accumulo.master;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.io.IOException;
@@ -38,6 +36,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -789,7 +788,7 @@ public class Master extends AccumuloServerContext
         return MasterGoalState.valueOf(new String(data));
       } catch (Exception e) {
         log.error("Problem getting real goal state from zookeeper: " + e);
-        sleepUninterruptibly(1_000, MILLISECONDS);
+        sleepUninterruptibly(1, TimeUnit.SECONDS);
       }
   }
 
@@ -940,7 +939,7 @@ public class Master extends AccumuloServerContext
             log.error("Error cleaning up migrations", ex);
           }
         }
-        sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, MILLISECONDS);
+        sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, 
TimeUnit.MILLISECONDS);
       }
     }
 
@@ -1085,7 +1084,7 @@ public class Master extends AccumuloServerContext
         } catch (Throwable t) {
           log.error("Error balancing tablets, will wait for " + 
WAIT_BETWEEN_ERRORS / ONE_SECOND
               + " (seconds) and then retry", t);
-          sleepUninterruptibly(WAIT_BETWEEN_ERRORS, MILLISECONDS);
+          sleepUninterruptibly(WAIT_BETWEEN_ERRORS, TimeUnit.MILLISECONDS);
         }
       }
     }
@@ -1187,7 +1186,7 @@ public class Master extends AccumuloServerContext
         // Since an unbounded thread pool is being used, rate limit how fast 
task are added to the
         // executor. This prevents the threads from growing large unless there 
are lots of
         // unresponsive tservers.
-        sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), MILLISECONDS);
+        sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), 
TimeUnit.MILLISECONDS);
       }
       tp.submit(new Runnable() {
         @Override
@@ -1236,7 +1235,7 @@ public class Master extends AccumuloServerContext
     }
     tp.shutdown();
     try {
-      tp.awaitTermination(Math.max(10000, rpcTimeout / 3), MILLISECONDS);
+      tp.awaitTermination(Math.max(10000, rpcTimeout / 3), 
TimeUnit.MILLISECONDS);
     } catch (InterruptedException e) {
       log.debug("Interrupted while fetching status");
     }
@@ -1364,7 +1363,7 @@ public class Master extends AccumuloServerContext
           log.info("Waiting for AuthenticationTokenKeyManager to be 
initialized");
           logged = true;
         }
-        sleepUninterruptibly(200, MILLISECONDS);
+        sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
       }
       // And log when we are initialized
       log.info("AuthenticationTokenSecretManager is initialized");
@@ -1389,7 +1388,7 @@ public class Master extends AccumuloServerContext
     masterLock.replaceLockData(address.getBytes());
 
     while (!clientService.isServing()) {
-      sleepUninterruptibly(100, MILLISECONDS);
+      sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
     }
 
     // Start the daemon to scan the replication table and make units of work
@@ -1435,7 +1434,7 @@ public class Master extends AccumuloServerContext
     }
 
     while (clientService.isServing()) {
-      sleepUninterruptibly(500, MILLISECONDS);
+      sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
     }
     log.info("Shutting down fate.");
     fate.shutdown();
@@ -1479,7 +1478,8 @@ public class Master extends AccumuloServerContext
    *           if interrupted while blocking, propagated for caller to handle.
    */
   private void blockForTservers() throws InterruptedException {
-    long waitStart = System.nanoTime();
+
+    long waitStart = System.currentTimeMillis();
 
     AccumuloConfiguration accConfig = serverConfig.getConfiguration();
     long minTserverCount = 
accConfig.getCount(Property.MASTER_STARTUP_TSERVER_AVAIL_MIN_COUNT);
@@ -1490,6 +1490,7 @@ public class Master extends AccumuloServerContext
           tserverSet.size(), 
Property.MASTER_STARTUP_TSERVER_AVAIL_MIN_COUNT.getKey());
       return;
     }
+
     long maxWait = 
accConfig.getTimeInMillis(Property.MASTER_STARTUP_TSERVER_AVAIL_MAX_WAIT);
 
     if (maxWait <= 0) {
@@ -1498,12 +1499,14 @@ public class Master extends AccumuloServerContext
       maxWait = Long.MAX_VALUE;
     }
 
-    long retries = 10;
-    long waitPeriod = maxWait / retries;
+    // honor Retry condition that initial wait < max wait, otherwise use small 
value to allow thread
+    // yield to happen
+    long initialWait = Math.min(50, maxWait / 2);
 
-    Retry tserverRetry = 
Retry.builder().maxRetries(retries).retryAfter(waitPeriod, MILLISECONDS)
-        .incrementBy(0, MILLISECONDS).maxWait(waitPeriod, MILLISECONDS)
-        .logInterval(30_000, MILLISECONDS).createRetry();
+    Retry tserverRetry =
+        Retry.builder().infiniteRetries().retryAfter(initialWait, 
TimeUnit.MILLISECONDS)
+            .incrementBy(15_000, TimeUnit.MILLISECONDS).maxWait(maxWait, 
TimeUnit.MILLISECONDS)
+            .logInterval(30_000, TimeUnit.MILLISECONDS).createRetry();
 
     log.info("Checking for tserver availability - need to reach {} servers. 
Have {}",
         minTserverCount, tserverSet.size());
@@ -1518,25 +1521,27 @@ public class Master extends AccumuloServerContext
 
       // suppress last message once threshold reached.
       if (needTservers) {
-        tserverRetry.logRetry(log, String.format(
-            "Blocking for tserver availability - need to reach %s servers. 
Have %s Time spent blocking %s seconds.",
+        log.info(
+            "Blocking for tserver availability - need to reach {} servers. 
Have {}"
+                + " Time spent blocking {} sec.",
             minTserverCount, tserverSet.size(),
-            NANOSECONDS.toSeconds(System.nanoTime() - waitStart)));
+            TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
waitStart));
       }
-      tserverRetry.useRetry();
     }
 
     if (tserverSet.size() < minTserverCount) {
       log.warn(
           "tserver availability check time expired - continuing. Requested {}, 
have {} tservers on line. "
-              + " Time waiting {} sec",
-          tserverSet.size(), minTserverCount, 
NANOSECONDS.toSeconds(System.nanoTime() - waitStart));
+              + " Time waiting {} ms",
+          tserverSet.size(), minTserverCount,
+          TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
waitStart));
 
     } else {
       log.info(
           "tserver availability check completed. Requested {}, have {} 
tservers on line. "
-              + " Time waiting {} sec",
-          tserverSet.size(), minTserverCount, 
NANOSECONDS.toSeconds(System.nanoTime() - waitStart));
+              + " Time waiting {} ms",
+          tserverSet.size(), minTserverCount,
+          TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
waitStart));
     }
   }
 
@@ -1634,7 +1639,7 @@ public class Master extends AccumuloServerContext
 
       masterLock.tryToCancelAsyncLockOrUnlock();
 
-      sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, MILLISECONDS);
+      sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, 
TimeUnit.MILLISECONDS);
     }
 
     setMasterState(MasterState.HAVE_LOCK);

Reply via email to