This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 663369f517 Remove TableObserver and table state cache (#5422)
663369f517 is described below
commit 663369f5175218481456a541be5bd70ab8bc31b7
Author: Christopher Tubbs <[email protected]>
AuthorDate: Mon Mar 24 21:31:43 2025 -0400
Remove TableObserver and table state cache (#5422)
* Remove observer registry and table state cache from TableManager
* Move TableStateWatcher to its own file
* Remove TableObserver interface (only used by Manager) and replace with
manager explicitly registering TableStateWatcher and its handler code
with ZooCache
---
.../accumulo/server/tables/TableManager.java | 149 ++-------------------
.../accumulo/server/tables/TableObserver.java | 30 -----
.../main/java/org/apache/accumulo/gc/GCRun.java | 1 -
.../java/org/apache/accumulo/manager/Manager.java | 26 ++--
.../apache/accumulo/manager/TableStateWatcher.java | 85 ++++++++++++
5 files changed, 108 insertions(+), 183 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
index adab9869f7..11b3002f7e 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
@@ -20,10 +20,7 @@ package org.apache.accumulo.server.tables;
import static java.nio.charset.StandardCharsets.UTF_8;
-import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -40,15 +37,12 @@ import
org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.tables.TableNameUtil;
-import org.apache.accumulo.core.zookeeper.ZooCache.ZooCacheWatcher;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.store.NamespacePropKey;
import org.apache.accumulo.server.conf.store.PropStore;
import org.apache.accumulo.server.conf.store.TablePropKey;
import org.apache.accumulo.server.util.PropUtil;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,20 +53,12 @@ public class TableManager {
private static final Logger log =
LoggerFactory.getLogger(TableManager.class);
private static final byte[] ZERO_BYTE = {'0'};
- private final Set<TableObserver> observers = Collections.synchronizedSet(new
HashSet<>());
- private final Map<TableId,TableState> tableStateCache =
- Collections.synchronizedMap(new HashMap<>());
-
private final ServerContext context;
private final ZooReaderWriter zoo;
public TableManager(ServerContext context) {
this.context = context;
this.zoo = context.getZooSession().asReaderWriter();
-
- // add our Watcher to the shared ZooCache
- context.getZooCache().addZooCacheWatcher(new TableStateWatcher());
- updateTableStateCache();
}
public void prepareNewNamespaceState(NamespaceId namespaceId, String
namespace,
@@ -110,10 +96,6 @@ public class TableManager {
}
}
- public TableState getTableState(TableId tableId) {
- return tableStateCache.get(tableId);
- }
-
public synchronized void transitionTableState(final TableId tableId, final
TableState newState,
final EnumSet<TableState> expectedCurrStates) {
Preconditions.checkArgument(newState != TableState.UNKNOWN);
@@ -161,40 +143,25 @@ public class TableManager {
}
}
- private void updateTableStateCache() {
- synchronized (tableStateCache) {
- for (String tableId :
context.getZooCache().getChildren(Constants.ZTABLES)) {
- if (context.getZooCache().get(Constants.ZTABLES + "/" + tableId +
Constants.ZTABLE_STATE)
- != null) {
- updateTableStateCache(TableId.of(tableId));
- }
- }
- }
- }
-
- public TableState updateTableStateCache(TableId tableId) {
- synchronized (tableStateCache) {
- TableState tState = TableState.UNKNOWN;
- byte[] data =
- context.getZooCache().get(Constants.ZTABLES + "/" + tableId +
Constants.ZTABLE_STATE);
- if (data != null) {
- String sState = new String(data, UTF_8);
- try {
- tState = TableState.valueOf(sState);
- } catch (IllegalArgumentException e) {
- log.error("Unrecognized state for table with tableId={}: {}",
tableId, sState);
- }
- tableStateCache.put(tableId, tState);
+ public TableState getTableState(TableId tableId) {
+ TableState tState = TableState.UNKNOWN;
+ byte[] data =
+ context.getZooCache().get(Constants.ZTABLES + "/" + tableId +
Constants.ZTABLE_STATE);
+ if (data != null) {
+ String sState = new String(data, UTF_8);
+ try {
+ tState = TableState.valueOf(sState);
+ } catch (IllegalArgumentException e) {
+ log.error("Unrecognized state for table with tableId={}: {}", tableId,
sState);
}
- return tState;
}
+ return tState;
}
public void addTable(TableId tableId, NamespaceId namespaceId, String
tableName)
throws KeeperException, InterruptedException, NamespaceNotFoundException
{
prepareNewTableState(tableId, namespaceId, tableName, TableState.NEW,
NodeExistsPolicy.OVERWRITE);
- updateTableStateCache(tableId);
}
public void cloneTable(TableId srcTableId, TableId tableId, String tableName,
@@ -209,100 +176,12 @@ public class TableManager {
PropUtil.setProperties(context, TablePropKey.of(tableId), propertiesToSet);
PropUtil.removeProperties(context, TablePropKey.of(tableId),
propertiesToExclude);
-
- updateTableStateCache(tableId);
}
public void removeTable(TableId tableId) throws KeeperException,
InterruptedException {
- synchronized (tableStateCache) {
- tableStateCache.remove(tableId);
- zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId +
Constants.ZTABLE_STATE,
- NodeMissingPolicy.SKIP);
- zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId,
NodeMissingPolicy.SKIP);
- }
- }
-
- public boolean addObserver(TableObserver to) {
- synchronized (observers) {
- synchronized (tableStateCache) {
- to.initialize();
- return observers.add(to);
- }
- }
- }
-
- private class TableStateWatcher implements ZooCacheWatcher {
- @Override
- public void accept(WatchedEvent event) {
- if (log.isTraceEnabled()) {
- log.trace("{}", event);
- }
- final String zPath = event.getPath();
- final EventType zType = event.getType();
-
- TableId tableId = null;
-
- if (zPath != null && zPath.startsWith(Constants.ZTABLES + "/")) {
- String suffix = zPath.substring(Constants.ZTABLES.length() + 1);
- if (suffix.contains("/")) {
- String[] sa = suffix.split("/", 2);
- if (Constants.ZTABLE_STATE.equals("/" + sa[1])) {
- tableId = TableId.of(sa[0]);
- }
- }
- if (tableId == null) {
- log.trace("Unhandled path {}", event);
- return;
- }
- }
-
- switch (zType) {
- case NodeChildrenChanged:
- // According to documentation we should not receive this event now
- // that ZooCache is using Persistent Watchers. Not logging an error
here.
- // According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475
we
- // may receive this event (Fixed in 3.9.0)
- break;
- case NodeCreated:
- case NodeDataChanged:
- // state transition
- if (tableId != null) {
- TableState tState = updateTableStateCache(tableId);
- log.debug("State transition to {} @ {}", tState, event);
- synchronized (observers) {
- for (TableObserver to : observers) {
- to.stateChanged(tableId, tState);
- }
- }
- }
- break;
- case NodeDeleted:
- if (zPath != null && tableId != null
- && (zPath.equals(Constants.ZTABLES + "/" + tableId +
Constants.ZTABLE_STATE)
- || zPath.equals(Constants.ZTABLES + "/" + tableId +
Constants.ZCONFIG)
- || zPath.equals(Constants.ZTABLES + "/" + tableId +
Constants.ZTABLE_NAME))) {
- tableStateCache.remove(tableId);
- }
- break;
- case None:
- switch (event.getState()) {
- case Expired:
- log.trace("Session expired; {}", event);
- synchronized (observers) {
- for (TableObserver to : observers) {
- to.sessionExpired();
- }
- }
- break;
- case SyncConnected:
- default:
- log.trace("Ignored {}", event);
- }
- break;
- default:
- log.warn("Unhandled {}", event);
- }
- }
+ zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId +
Constants.ZTABLE_STATE,
+ NodeMissingPolicy.SKIP);
+ zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId,
NodeMissingPolicy.SKIP);
}
public void removeNamespace(NamespaceId namespaceId)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java
b/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java
deleted file mode 100644
index 6b20a3fb91..0000000000
---
a/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.server.tables;
-
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.manager.state.tables.TableState;
-
-public interface TableObserver {
- void initialize();
-
- void stateChanged(TableId tableId, TableState tState);
-
- void sessionExpired();
-}
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
index 3a79855d8d..bb089c8885 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
@@ -341,7 +341,6 @@ public class GCRun implements GarbageCollectionEnvironment {
if (parts.length > 2) {
TableId tableId = TableId.of(parts[1]);
String tabletDir = parts[2];
- context.getTableManager().updateTableStateCache(tableId);
TableState tableState =
context.getTableManager().getTableState(tableId);
if (tableState != null && tableState != TableState.DELETING) {
// clone directories don't always exist
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 94ac0e76b5..4c6d75755b 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -149,7 +149,6 @@ import
org.apache.accumulo.server.security.SecurityOperation;
import
org.apache.accumulo.server.security.delegation.AuthenticationTokenKeyManager;
import
org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyDistributor;
import org.apache.accumulo.server.tables.TableManager;
-import org.apache.accumulo.server.tables.TableObserver;
import org.apache.accumulo.server.util.ScanServerMetadataEntries;
import org.apache.accumulo.server.util.ServerBulkImportStatus;
import org.apache.accumulo.server.util.TableInfoUtil;
@@ -179,7 +178,7 @@ import io.opentelemetry.context.Scope;
* <p>
* The manager will also coordinate log recoveries and reports general status.
*/
-public class Manager extends AbstractServer implements
LiveTServerSet.Listener, TableObserver {
+public class Manager extends AbstractServer implements LiveTServerSet.Listener
{
static final Logger log = LoggerFactory.getLogger(Manager.class);
@@ -1174,7 +1173,14 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
recoveryManager = new RecoveryManager(this,
timeToCacheRecoveryWalExistence);
- context.getTableManager().addObserver(this);
+ context.getZooCache().addZooCacheWatcher(new TableStateWatcher((tableId,
event) -> {
+ TableState state = getTableManager().getTableState(tableId);
+ log.debug("Table state transition to {} @ {}", state, event);
+ nextEvent.event(tableId, "Table state in zookeeper changed for %s to
%s", tableId, state);
+ if (state == TableState.OFFLINE) {
+ clearMigrations(tableId);
+ }
+ }));
tableInformationStatusPool = ThreadPools.getServerThreadPools()
.createExecutorService(getConfiguration(),
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
@@ -1709,20 +1715,6 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
}
}
- @Override
- public void stateChanged(TableId tableId, TableState state) {
- nextEvent.event(tableId, "Table state in zookeeper changed for %s to %s",
tableId, state);
- if (state == TableState.OFFLINE) {
- clearMigrations(tableId);
- }
- }
-
- @Override
- public void initialize() {}
-
- @Override
- public void sessionExpired() {}
-
public Set<TableId> onlineTables() {
Set<TableId> result = new HashSet<>();
if (getManagerState() != ManagerState.NORMAL) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/TableStateWatcher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/TableStateWatcher.java
new file mode 100644
index 0000000000..5b53e03d6c
--- /dev/null
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/TableStateWatcher.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager;
+
+import java.util.function.BiConsumer;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.zookeeper.ZooCache.ZooCacheWatcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class TableStateWatcher implements ZooCacheWatcher {
+
+ private final BiConsumer<TableId,WatchedEvent> stateChangeConsumer;
+
+ TableStateWatcher(BiConsumer<TableId,WatchedEvent> stateChangeConsumer) {
+ this.stateChangeConsumer = stateChangeConsumer;
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(TableStateWatcher.class);
+
+ @Override
+ public void accept(WatchedEvent event) {
+ log.trace("{}", event);
+ final String zPath = event.getPath();
+ final EventType zType = event.getType();
+
+ TableId tableId = null;
+
+ if (zPath != null && zPath.startsWith(Constants.ZTABLES + "/")) {
+ String suffix = zPath.substring(Constants.ZTABLES.length() + 1);
+ if (suffix.contains("/")) {
+ String[] sa = suffix.split("/", 2);
+ if (Constants.ZTABLE_STATE.equals("/" + sa[1])) {
+ tableId = TableId.of(sa[0]);
+ }
+ }
+ if (tableId == null) {
+ log.trace("Unhandled path {}", event);
+ return;
+ }
+ }
+
+ switch (zType) {
+ case NodeCreated:
+ case NodeDataChanged:
+ // state transition
+ if (tableId != null) {
+ stateChangeConsumer.accept(tableId, event);
+ }
+ break;
+ case NodeChildrenChanged:
+ // According to documentation we should not receive this event now
+ // that ZooCache is using Persistent Watchers. Not logging an error
here.
+ // According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475 we
+ // may receive this event (Fixed in 3.9.0)
+ case NodeDeleted:
+ // ignore
+ case None:
+ // ignore
+ break;
+ default:
+ log.warn("Unhandled {}", event);
+ }
+ }
+}