This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 4b5234bd87 Added ZK cleanup thread to Manager for Scan Server nodes (#4562) 4b5234bd87 is described below commit 4b5234bd87a46bfcd686b3db9bda9adff753f556 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu May 16 14:54:05 2024 -0400 Added ZK cleanup thread to Manager for Scan Server nodes (#4562) Closes #4559 --- .../java/org/apache/accumulo/manager/Manager.java | 49 +++++++++++++++++++++- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 86a1dd71d3..84e8e68519 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -72,6 +72,7 @@ import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath; +import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -174,7 +175,7 @@ public class Manager extends AbstractServer static final Logger log = LoggerFactory.getLogger(Manager.class); static final int ONE_SECOND = 1000; - private static final long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND; + private static final long CLEANUP_INTERVAL_MINUTES = 5; static final long WAIT_BETWEEN_ERRORS = ONE_SECOND; private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND; private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND; @@ -698,7 +699,7 @@ public class Manager extends AbstractServer log.error("Error cleaning up migrations", ex); } } - sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, MILLISECONDS); + sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES); } } @@ -740,6 +741,48 @@ public class Manager extends AbstractServer } } + private class ScanServerZKCleaner implements Runnable { + + @Override + public void run() { + + final ZooReaderWriter zrw = getContext().getZooReaderWriter(); + final String sserverZNodePath = getContext().getZooKeeperRoot() + Constants.ZSSERVERS; + + while (stillManager()) { + try { + for (String sserverClientAddress : zrw.getChildren(sserverZNodePath)) { + + final String sServerZPath = sserverZNodePath + "/" + sserverClientAddress; + final var zLockPath = ServiceLock.path(sServerZPath); + ZcStat stat = new ZcStat(); + byte[] lockData = ServiceLock.getLockData(getContext().getZooCache(), zLockPath, stat); + + if (lockData == null) { + try { + log.debug("Deleting empty ScanServer ZK node {}", sServerZPath); + zrw.delete(sServerZPath); + } catch (KeeperException.NotEmptyException e) { + log.debug( + "Failed to delete ScanServer ZK node {} its not empty, likely an expected race condition.", + sServerZPath); + } + } + } + } catch (KeeperException e) { + log.error("Exception trying to delete empty scan server ZNodes, will retry", e); + } catch (InterruptedException e) { + Thread.interrupted(); + log.error("Interrupted trying to delete empty scan server ZNodes, will retry", e); + } finally { + // sleep for 5 mins + sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES); + } + } + } + + } + private class StatusThread implements Runnable { private boolean goodStats() { @@ -1118,6 +1161,8 @@ public class Manager extends AbstractServer tserverSet.startListeningForTabletServerChanges(); + Threads.createThread("ScanServer Cleanup Thread", new ScanServerZKCleaner()).start(); + try { blockForTservers(); } catch (InterruptedException ex) {