This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 34d27c9f0e Created common method for creating non-HA server zk path
(#5225)
34d27c9f0e is described below
commit 34d27c9f0e89a5dac4af368dfc9d98c159fb0878
Author: Dave Marion <[email protected]>
AuthorDate: Mon Jan 13 08:43:26 2025 -0500
Created common method for creating non-HA server zk path (#5225)
For HA servers (Manager, Monitor, etc) the ZK node is created at
instance initialization time. For non-HA servers (compactor, sserver,
tserver) the paths are created when the server is started. Each
server impl had code to do this and I just moved it into a common
location.
---
.../accumulo/core/lock/ServiceLockSupport.java | 64 ++++++++++++++++------
.../org/apache/accumulo/compactor/Compactor.java | 24 ++------
.../apache/accumulo/gc/SimpleGarbageCollector.java | 3 +-
.../java/org/apache/accumulo/manager/Manager.java | 5 +-
.../java/org/apache/accumulo/monitor/Monitor.java | 3 +-
.../org/apache/accumulo/tserver/ScanServer.java | 26 ++-------
.../org/apache/accumulo/tserver/TabletServer.java | 24 ++------
7 files changed, 70 insertions(+), 79 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
index 3129511b5e..eb70d8e84c 100644
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
@@ -21,16 +21,45 @@ package org.apache.accumulo.core.lock;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
+import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.util.Halt;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoAuthException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServiceLockSupport {
+ private static final Logger LOG =
LoggerFactory.getLogger(ServiceLockSupport.class);
+
+ /**
+ * Ensures that the resource group node in ZooKeeper is created for this
server
+ */
+ public static void createNonHaServiceLockPath(Type server, ZooReaderWriter
zrw,
+ ServiceLockPath slp) throws KeeperException, InterruptedException {
+ // The ServiceLockPath contains a resource group in the path which is not
created
+ // at initialization time. If it does not exist, then create it.
+ String rgPath = slp.toString().substring(0, slp.toString().lastIndexOf("/"
+ slp.getServer()));
+ LOG.debug("Creating {} resource group path in zookeeper: {}", server,
rgPath);
+ try {
+ zrw.mkdirs(rgPath);
+ zrw.putPersistentData(slp.toString(), new byte[] {},
NodeExistsPolicy.SKIP);
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.NOAUTH) {
+ LOG.error("Failed to write to ZooKeeper. Ensure that"
+ + " accumulo.properties, specifically instance.secret, is
consistent.");
+ }
+ throw e;
+ }
+
+ }
+
/**
* Lock Watcher used by Highly Available services. These are services where
only instance is
* running at a time, but another backup service can be started that will be
used if the active
@@ -40,30 +69,29 @@ public class ServiceLockSupport {
private static final Logger LOG =
LoggerFactory.getLogger(HAServiceLockWatcher.class);
- private final String serviceName;
+ private final Type server;
private volatile boolean acquiredLock = false;
private volatile boolean failedToAcquireLock = false;
- public HAServiceLockWatcher(String serviceName) {
- this.serviceName = serviceName;
+ public HAServiceLockWatcher(Type server) {
+ this.server = server;
}
@Override
public void lostLock(LockLossReason reason) {
- Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason +
"), exiting!", -1);
+ Halt.halt(server + " lock in zookeeper lost (reason = " + reason + "),
exiting!", -1);
}
@Override
public void unableToMonitorLockNode(final Exception e) {
// ACCUMULO-3651 Changed level to error and added FATAL to message for
slf4j compatibility
- Halt.halt(-1,
- () -> LOG.error("FATAL: No longer able to monitor {} lock node",
serviceName, e));
+ Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor {} lock
node", server, e));
}
@Override
public synchronized void acquiredLock() {
- LOG.debug("Acquired {} lock", serviceName);
+ LOG.debug("Acquired {} lock", server);
if (acquiredLock || failedToAcquireLock) {
Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " +
failedToAcquireLock, -1);
@@ -75,11 +103,11 @@ public class ServiceLockSupport {
@Override
public synchronized void failedToAcquireLock(Exception e) {
- LOG.warn("Failed to get {} lock", serviceName, e);
+ LOG.warn("Failed to get {} lock", server, e);
if (e instanceof NoAuthException) {
String msg =
- "Failed to acquire " + serviceName + " lock due to incorrect
ZooKeeper authentication.";
+ "Failed to acquire " + server + " lock due to incorrect ZooKeeper
authentication.";
LOG.error("{} Ensure instance.secret is consistent across Accumulo
configuration", msg, e);
Halt.halt(msg, -1);
}
@@ -96,7 +124,7 @@ public class ServiceLockSupport {
public synchronized void waitForChange() {
while (!acquiredLock && !failedToAcquireLock) {
try {
- LOG.info("{} lock held by someone else, waiting for a change in
state", serviceName);
+ LOG.info("{} lock held by someone else, waiting for a change in
state", server);
wait();
} catch (InterruptedException e) {
// empty
@@ -121,13 +149,13 @@ public class ServiceLockSupport {
private static final Logger LOG =
LoggerFactory.getLogger(ServiceLockWatcher.class);
- private final String serviceName;
+ private final Type server;
private final Supplier<Boolean> shuttingDown;
- private final Consumer<String> lostLockAction;
+ private final Consumer<Type> lostLockAction;
- public ServiceLockWatcher(String serviceName, Supplier<Boolean>
shuttingDown,
- Consumer<String> lostLockAction) {
- this.serviceName = serviceName;
+ public ServiceLockWatcher(Type server, Supplier<Boolean> shuttingDown,
+ Consumer<Type> lostLockAction) {
+ this.server = server;
this.shuttingDown = shuttingDown;
this.lostLockAction = lostLockAction;
}
@@ -136,15 +164,15 @@ public class ServiceLockSupport {
public void lostLock(final LockLossReason reason) {
Halt.halt(1, () -> {
if (!shuttingDown.get()) {
- LOG.error("{} lost lock (reason = {}), exiting.", serviceName,
reason);
+ LOG.error("{} lost lock (reason = {}), exiting.", server, reason);
}
- lostLockAction.accept(serviceName);
+ lostLockAction.accept(server);
});
}
@Override
public void unableToMonitorLockNode(final Exception e) {
- Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock,
exiting.", serviceName, e));
+ Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock,
exiting.", server, e));
}
}
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 20502cfcaa..e61ab5a112 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
@@ -80,6 +81,7 @@ import
org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
+import org.apache.accumulo.core.lock.ServiceLockSupport;
import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
@@ -272,27 +274,13 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
protected void announceExistence(HostAndPort clientAddress)
throws KeeperException, InterruptedException {
- ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
-
+ final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
final ServiceLockPath path =
getContext().getServerPaths().createCompactorPath(getResourceGroup(),
clientAddress);
- // The ServiceLockPath contains a resource group in the path which is not
created
- // at initialization time. If it does not exist, then create it.
- final String compactorGroupPath =
- path.toString().substring(0, path.toString().lastIndexOf("/" +
path.getServer()));
- LOG.debug("Creating compactor resource group path in zookeeper: {}",
compactorGroupPath);
- try {
- zoo.mkdirs(compactorGroupPath);
- zoo.putPersistentData(path.toString(), new byte[] {},
NodeExistsPolicy.SKIP);
- } catch (KeeperException.NoAuthException e) {
- LOG.error("Failed to write to ZooKeeper. Ensure that"
- + " accumulo.properties, specifically instance.secret, is
consistent.");
- throw e;
- }
-
+ ServiceLockSupport.createNonHaServiceLockPath(Type.COMPACTOR, zoo, path);
compactorLock = new ServiceLock(getContext().getZooSession(), path,
compactorId);
- LockWatcher lw = new ServiceLockWatcher("compactor", () -> false,
- (name) ->
getContext().getLowMemoryDetector().logGCInfo(getConfiguration()));
+ LockWatcher lw = new ServiceLockWatcher(Type.COMPACTOR, () -> false,
+ (type) ->
getContext().getLowMemoryDetector().logGCInfo(getConfiguration()));
try {
for (int i = 0; i < 25; i++) {
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index efef0dd72c..62449eabb6 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -35,6 +35,7 @@ import java.util.stream.IntStream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -370,7 +371,7 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
UUID zooLockUUID = UUID.randomUUID();
gcLock = new ServiceLock(getContext().getZooSession(), path, zooLockUUID);
- HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc");
+ HAServiceLockWatcher gcLockWatcher = new
HAServiceLockWatcher(Type.GARBAGE_COLLECTOR);
while (true) {
gcLock.lock(gcLockWatcher, new ServiceLockData(zooLockUUID,
addr.toString(), ThriftService.GC,
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 17ae1d10d7..eb22227bb9 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -61,6 +61,7 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
@@ -1499,14 +1500,12 @@ public class Manager extends AbstractServer
ServiceDescriptors descriptors = new ServiceDescriptors();
descriptors.addService(new ServiceDescriptor(zooLockUUID,
ThriftService.MANAGER,
managerClientAddress, this.getResourceGroup()));
-
ServiceLockData sld = new ServiceLockData(descriptors);
-
managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID);
+ HAServiceLockWatcher managerLockWatcher = new
HAServiceLockWatcher(Type.MANAGER);
while (true) {
- HAServiceLockWatcher managerLockWatcher = new
HAServiceLockWatcher("manager");
managerLock.lock(managerLockWatcher, sld);
managerLockWatcher.waitForChange();
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index cabd4ef957..b728b8a2b7 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -45,6 +45,7 @@ import jakarta.inject.Singleton;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.admin.servers.ServerId;
+import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
import org.apache.accumulo.core.compaction.thrift.CompactorService;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
@@ -741,7 +742,7 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
// Get a ZooLock for the monitor
UUID zooLockUUID = UUID.randomUUID();
monitorLock = new ServiceLock(context.getZooSession(), monitorLockPath,
zooLockUUID);
- HAServiceLockWatcher monitorLockWatcher = new
HAServiceLockWatcher("monitor");
+ HAServiceLockWatcher monitorLockWatcher = new
HAServiceLockWatcher(Type.MONITOR);
while (true) {
monitorLock.lock(monitorLockWatcher,
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 9d414477b4..8a409d566b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -50,6 +50,7 @@ import java.util.stream.Stream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -75,6 +76,7 @@ import
org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
+import org.apache.accumulo.core.lock.ServiceLockSupport;
import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher;
import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -119,7 +121,6 @@ import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.accumulo.tserver.tablet.TabletBase;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
-import org.apache.zookeeper.KeeperException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -326,31 +327,16 @@ public class ScanServer extends AbstractServer
* Set up nodes and locks in ZooKeeper for this Compactor
*/
private ServiceLock announceExistence() {
- ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
+ final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
try {
final ServiceLockPath zLockPath =
context.getServerPaths().createScanServerPath(getResourceGroup(),
clientAddress);
+ ServiceLockSupport.createNonHaServiceLockPath(Type.SCAN_SERVER, zoo,
zLockPath);
serverLockUUID = UUID.randomUUID();
- // The ServiceLockPath contains a resource group in the path which is
not created
- // at initialization time. If it does not exist, then create it.
- String sserverGroupPath = zLockPath.toString().substring(0,
- zLockPath.toString().lastIndexOf("/" + zLockPath.getServer()));
- LOG.debug("Creating sserver resource group path in zookeeper: {}",
sserverGroupPath);
- try {
- zoo.mkdirs(sserverGroupPath);
- // Old zk nodes can be cleaned up by ZooZap
- zoo.putPersistentData(zLockPath.toString(), new byte[] {},
NodeExistsPolicy.SKIP);
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.NOAUTH) {
- LOG.error("Failed to write to ZooKeeper. Ensure that"
- + " accumulo.properties, specifically instance.secret, is
consistent.");
- }
- throw e;
- }
scanServerLock = new ServiceLock(getContext().getZooSession(),
zLockPath, serverLockUUID);
- LockWatcher lw = new ServiceLockWatcher("scan server", () ->
serverStopRequested,
- (name) ->
context.getLowMemoryDetector().logGCInfo(getConfiguration()));
+ LockWatcher lw = new ServiceLockWatcher(Type.SCAN_SERVER, () ->
serverStopRequested,
+ (type) ->
context.getLowMemoryDetector().logGCInfo(getConfiguration()));
for (int i = 0; i < 120 / 5; i++) {
zoo.putPersistentData(zLockPath.toString(), new byte[0],
NodeExistsPolicy.SKIP);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 08d83f148d..aea44fcdaf 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -66,6 +66,7 @@ import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.client.admin.servers.ServerId;
+import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.ClientTabletCache;
import org.apache.accumulo.core.clientImpl.DurabilityImpl;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -85,6 +86,7 @@ import
org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
+import org.apache.accumulo.core.lock.ServiceLockSupport;
import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher;
import org.apache.accumulo.core.manager.thrift.Compacting;
import org.apache.accumulo.core.manager.thrift.ManagerClientService;
@@ -487,31 +489,17 @@ public class TabletServer extends AbstractServer
implements TabletHostingServer
}
private void announceExistence() {
- ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
+ final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
try {
final ServiceLockPath zLockPath =
context.getServerPaths().createTabletServerPath(getResourceGroup(),
clientAddress);
- // The ServiceLockPath contains a resource group in the path which is
not created
- // at initialization time. If it does not exist, then create it.
- String tserverGroupPath = zLockPath.toString().substring(0,
- zLockPath.toString().lastIndexOf("/" + zLockPath.getServer()));
- log.debug("Creating tserver resource group path in zookeeper: {}",
tserverGroupPath);
- try {
- zoo.mkdirs(tserverGroupPath);
- zoo.putPersistentData(zLockPath.toString(), new byte[] {},
NodeExistsPolicy.SKIP);
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.NOAUTH) {
- log.error("Failed to write to ZooKeeper. Ensure that"
- + " accumulo.properties, specifically instance.secret, is
consistent.");
- }
- throw e;
- }
+ ServiceLockSupport.createNonHaServiceLockPath(Type.TABLET_SERVER, zoo,
zLockPath);
UUID tabletServerUUID = UUID.randomUUID();
tabletServerLock = new ServiceLock(getContext().getZooSession(),
zLockPath, tabletServerUUID);
- LockWatcher lw = new ServiceLockWatcher("tablet server", () ->
serverStopRequested,
- (name) ->
context.getLowMemoryDetector().logGCInfo(getConfiguration()));
+ LockWatcher lw = new ServiceLockWatcher(Type.TABLET_SERVER, () ->
serverStopRequested,
+ (type) ->
context.getLowMemoryDetector().logGCInfo(getConfiguration()));
for (int i = 0; i < 120 / 5; i++) {
zoo.putPersistentData(zLockPath.toString(), new byte[0],
NodeExistsPolicy.SKIP);