This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 4b5234bd87 Added ZK cleanup thread to Manager for Scan Server nodes 
(#4562)
4b5234bd87 is described below

commit 4b5234bd87a46bfcd686b3db9bda9adff753f556
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu May 16 14:54:05 2024 -0400

    Added ZK cleanup thread to Manager for Scan Server nodes (#4562)
    
    Closes #4559
---
 .../java/org/apache/accumulo/manager/Manager.java  | 49 +++++++++++++++++++++-
 1 file changed, 47 insertions(+), 2 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 86a1dd71d3..84e8e68519 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -72,6 +72,7 @@ import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
+import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -174,7 +175,7 @@ public class Manager extends AbstractServer
   static final Logger log = LoggerFactory.getLogger(Manager.class);
 
   static final int ONE_SECOND = 1000;
-  private static final long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * 
ONE_SECOND;
+  private static final long CLEANUP_INTERVAL_MINUTES = 5;
   static final long WAIT_BETWEEN_ERRORS = ONE_SECOND;
   private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
   private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
@@ -698,7 +699,7 @@ public class Manager extends AbstractServer
             log.error("Error cleaning up migrations", ex);
           }
         }
-        sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, MILLISECONDS);
+        sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES);
       }
     }
 
@@ -740,6 +741,48 @@ public class Manager extends AbstractServer
     }
   }
 
+  private class ScanServerZKCleaner implements Runnable {
+
+    @Override
+    public void run() {
+
+      final ZooReaderWriter zrw = getContext().getZooReaderWriter();
+      final String sserverZNodePath = getContext().getZooKeeperRoot() + 
Constants.ZSSERVERS;
+
+      while (stillManager()) {
+        try {
+          for (String sserverClientAddress : 
zrw.getChildren(sserverZNodePath)) {
+
+            final String sServerZPath = sserverZNodePath + "/" + 
sserverClientAddress;
+            final var zLockPath = ServiceLock.path(sServerZPath);
+            ZcStat stat = new ZcStat();
+            byte[] lockData = 
ServiceLock.getLockData(getContext().getZooCache(), zLockPath, stat);
+
+            if (lockData == null) {
+              try {
+                log.debug("Deleting empty ScanServer ZK node {}", 
sServerZPath);
+                zrw.delete(sServerZPath);
+              } catch (KeeperException.NotEmptyException e) {
+                log.debug(
+                    "Failed to delete ScanServer ZK node {} its not empty, 
likely an expected race condition.",
+                    sServerZPath);
+              }
+            }
+          }
+        } catch (KeeperException e) {
+          log.error("Exception trying to delete empty scan server ZNodes, will 
retry", e);
+        } catch (InterruptedException e) {
+          Thread.interrupted();
+          log.error("Interrupted trying to delete empty scan server ZNodes, 
will retry", e);
+        } finally {
+          // sleep for 5 mins
+          sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES);
+        }
+      }
+    }
+
+  }
+
   private class StatusThread implements Runnable {
 
     private boolean goodStats() {
@@ -1118,6 +1161,8 @@ public class Manager extends AbstractServer
 
     tserverSet.startListeningForTabletServerChanges();
 
+    Threads.createThread("ScanServer Cleanup Thread", new 
ScanServerZKCleaner()).start();
+
     try {
       blockForTservers();
     } catch (InterruptedException ex) {

Reply via email to