This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 663369f517 Remove TableObserver and table state cache (#5422) 663369f517 is described below commit 663369f5175218481456a541be5bd70ab8bc31b7 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Mon Mar 24 21:31:43 2025 -0400 Remove TableObserver and table state cache (#5422) * Remove observer registry and table state cache from TableManager * Move TableStateWatcher to its own file * Remove TableObserver interface (only used by Manager) and replace with manager explicitly registering TableStateWatcher and its handler code with ZooCache --- .../accumulo/server/tables/TableManager.java | 149 ++------------------- .../accumulo/server/tables/TableObserver.java | 30 ----- .../main/java/org/apache/accumulo/gc/GCRun.java | 1 - .../java/org/apache/accumulo/manager/Manager.java | 26 ++-- .../apache/accumulo/manager/TableStateWatcher.java | 85 ++++++++++++ 5 files changed, 108 insertions(+), 183 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java index adab9869f7..11b3002f7e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java @@ -20,10 +20,7 @@ package org.apache.accumulo.server.tables; import static java.nio.charset.StandardCharsets.UTF_8; -import java.util.Collections; import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -40,15 +37,12 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.tables.TableNameUtil; -import org.apache.accumulo.core.zookeeper.ZooCache.ZooCacheWatcher; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.store.NamespacePropKey; import org.apache.accumulo.server.conf.store.PropStore; import org.apache.accumulo.server.conf.store.TablePropKey; import org.apache.accumulo.server.util.PropUtil; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher.Event.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,20 +53,12 @@ public class TableManager { private static final Logger log = LoggerFactory.getLogger(TableManager.class); private static final byte[] ZERO_BYTE = {'0'}; - private final Set<TableObserver> observers = Collections.synchronizedSet(new HashSet<>()); - private final Map<TableId,TableState> tableStateCache = - Collections.synchronizedMap(new HashMap<>()); - private final ServerContext context; private final ZooReaderWriter zoo; public TableManager(ServerContext context) { this.context = context; this.zoo = context.getZooSession().asReaderWriter(); - - // add our Watcher to the shared ZooCache - context.getZooCache().addZooCacheWatcher(new TableStateWatcher()); - updateTableStateCache(); } public void prepareNewNamespaceState(NamespaceId namespaceId, String namespace, @@ -110,10 +96,6 @@ public class TableManager { } } - public TableState getTableState(TableId tableId) { - return tableStateCache.get(tableId); - } - public synchronized void transitionTableState(final TableId tableId, final TableState newState, final EnumSet<TableState> expectedCurrStates) { Preconditions.checkArgument(newState != TableState.UNKNOWN); @@ -161,40 +143,25 @@ public class TableManager { } } - private void updateTableStateCache() { - synchronized (tableStateCache) { - for (String tableId : context.getZooCache().getChildren(Constants.ZTABLES)) { - if (context.getZooCache().get(Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE) - != null) { - updateTableStateCache(TableId.of(tableId)); - } - } - } - } - - public TableState updateTableStateCache(TableId tableId) { - synchronized (tableStateCache) { - TableState tState = TableState.UNKNOWN; - byte[] data = - context.getZooCache().get(Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE); - if (data != null) { - String sState = new String(data, UTF_8); - try { - tState = TableState.valueOf(sState); - } catch (IllegalArgumentException e) { - log.error("Unrecognized state for table with tableId={}: {}", tableId, sState); - } - tableStateCache.put(tableId, tState); + public TableState getTableState(TableId tableId) { + TableState tState = TableState.UNKNOWN; + byte[] data = + context.getZooCache().get(Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE); + if (data != null) { + String sState = new String(data, UTF_8); + try { + tState = TableState.valueOf(sState); + } catch (IllegalArgumentException e) { + log.error("Unrecognized state for table with tableId={}: {}", tableId, sState); } - return tState; } + return tState; } public void addTable(TableId tableId, NamespaceId namespaceId, String tableName) throws KeeperException, InterruptedException, NamespaceNotFoundException { prepareNewTableState(tableId, namespaceId, tableName, TableState.NEW, NodeExistsPolicy.OVERWRITE); - updateTableStateCache(tableId); } public void cloneTable(TableId srcTableId, TableId tableId, String tableName, @@ -209,100 +176,12 @@ public class TableManager { PropUtil.setProperties(context, TablePropKey.of(tableId), propertiesToSet); PropUtil.removeProperties(context, TablePropKey.of(tableId), propertiesToExclude); - - updateTableStateCache(tableId); } public void removeTable(TableId tableId) throws KeeperException, InterruptedException { - synchronized (tableStateCache) { - tableStateCache.remove(tableId); - zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE, - NodeMissingPolicy.SKIP); - zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId, NodeMissingPolicy.SKIP); - } - } - - public boolean addObserver(TableObserver to) { - synchronized (observers) { - synchronized (tableStateCache) { - to.initialize(); - return observers.add(to); - } - } - } - - private class TableStateWatcher implements ZooCacheWatcher { - @Override - public void accept(WatchedEvent event) { - if (log.isTraceEnabled()) { - log.trace("{}", event); - } - final String zPath = event.getPath(); - final EventType zType = event.getType(); - - TableId tableId = null; - - if (zPath != null && zPath.startsWith(Constants.ZTABLES + "/")) { - String suffix = zPath.substring(Constants.ZTABLES.length() + 1); - if (suffix.contains("/")) { - String[] sa = suffix.split("/", 2); - if (Constants.ZTABLE_STATE.equals("/" + sa[1])) { - tableId = TableId.of(sa[0]); - } - } - if (tableId == null) { - log.trace("Unhandled path {}", event); - return; - } - } - - switch (zType) { - case NodeChildrenChanged: - // According to documentation we should not receive this event now - // that ZooCache is using Persistent Watchers. Not logging an error here. - // According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475 we - // may receive this event (Fixed in 3.9.0) - break; - case NodeCreated: - case NodeDataChanged: - // state transition - if (tableId != null) { - TableState tState = updateTableStateCache(tableId); - log.debug("State transition to {} @ {}", tState, event); - synchronized (observers) { - for (TableObserver to : observers) { - to.stateChanged(tableId, tState); - } - } - } - break; - case NodeDeleted: - if (zPath != null && tableId != null - && (zPath.equals(Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE) - || zPath.equals(Constants.ZTABLES + "/" + tableId + Constants.ZCONFIG) - || zPath.equals(Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_NAME))) { - tableStateCache.remove(tableId); - } - break; - case None: - switch (event.getState()) { - case Expired: - log.trace("Session expired; {}", event); - synchronized (observers) { - for (TableObserver to : observers) { - to.sessionExpired(); - } - } - break; - case SyncConnected: - default: - log.trace("Ignored {}", event); - } - break; - default: - log.warn("Unhandled {}", event); - } - } + zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE, + NodeMissingPolicy.SKIP); + zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId, NodeMissingPolicy.SKIP); } public void removeNamespace(NamespaceId namespaceId) diff --git a/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java b/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java deleted file mode 100644 index 6b20a3fb91..0000000000 --- a/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server.tables; - -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.manager.state.tables.TableState; - -public interface TableObserver { - void initialize(); - - void stateChanged(TableId tableId, TableState tState); - - void sessionExpired(); -} diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java index 3a79855d8d..bb089c8885 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java @@ -341,7 +341,6 @@ public class GCRun implements GarbageCollectionEnvironment { if (parts.length > 2) { TableId tableId = TableId.of(parts[1]); String tabletDir = parts[2]; - context.getTableManager().updateTableStateCache(tableId); TableState tableState = context.getTableManager().getTableState(tableId); if (tableState != null && tableState != TableState.DELETING) { // clone directories don't always exist diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 94ac0e76b5..4c6d75755b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -149,7 +149,6 @@ import org.apache.accumulo.server.security.SecurityOperation; import org.apache.accumulo.server.security.delegation.AuthenticationTokenKeyManager; import org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyDistributor; import org.apache.accumulo.server.tables.TableManager; -import org.apache.accumulo.server.tables.TableObserver; import org.apache.accumulo.server.util.ScanServerMetadataEntries; import org.apache.accumulo.server.util.ServerBulkImportStatus; import org.apache.accumulo.server.util.TableInfoUtil; @@ -179,7 +178,7 @@ import io.opentelemetry.context.Scope; * <p> * The manager will also coordinate log recoveries and reports general status. */ -public class Manager extends AbstractServer implements LiveTServerSet.Listener, TableObserver { +public class Manager extends AbstractServer implements LiveTServerSet.Listener { static final Logger log = LoggerFactory.getLogger(Manager.class); @@ -1174,7 +1173,14 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, recoveryManager = new RecoveryManager(this, timeToCacheRecoveryWalExistence); - context.getTableManager().addObserver(this); + context.getZooCache().addZooCacheWatcher(new TableStateWatcher((tableId, event) -> { + TableState state = getTableManager().getTableState(tableId); + log.debug("Table state transition to {} @ {}", state, event); + nextEvent.event(tableId, "Table state in zookeeper changed for %s to %s", tableId, state); + if (state == TableState.OFFLINE) { + clearMigrations(tableId); + } + })); tableInformationStatusPool = ThreadPools.getServerThreadPools() .createExecutorService(getConfiguration(), Property.MANAGER_STATUS_THREAD_POOL_SIZE, false); @@ -1709,20 +1715,6 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, } } - @Override - public void stateChanged(TableId tableId, TableState state) { - nextEvent.event(tableId, "Table state in zookeeper changed for %s to %s", tableId, state); - if (state == TableState.OFFLINE) { - clearMigrations(tableId); - } - } - - @Override - public void initialize() {} - - @Override - public void sessionExpired() {} - public Set<TableId> onlineTables() { Set<TableId> result = new HashSet<>(); if (getManagerState() != ManagerState.NORMAL) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TableStateWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TableStateWatcher.java new file mode 100644 index 0000000000..5b53e03d6c --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TableStateWatcher.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import java.util.function.BiConsumer; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.zookeeper.ZooCache.ZooCacheWatcher; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class TableStateWatcher implements ZooCacheWatcher { + + private final BiConsumer<TableId,WatchedEvent> stateChangeConsumer; + + TableStateWatcher(BiConsumer<TableId,WatchedEvent> stateChangeConsumer) { + this.stateChangeConsumer = stateChangeConsumer; + } + + private static final Logger log = LoggerFactory.getLogger(TableStateWatcher.class); + + @Override + public void accept(WatchedEvent event) { + log.trace("{}", event); + final String zPath = event.getPath(); + final EventType zType = event.getType(); + + TableId tableId = null; + + if (zPath != null && zPath.startsWith(Constants.ZTABLES + "/")) { + String suffix = zPath.substring(Constants.ZTABLES.length() + 1); + if (suffix.contains("/")) { + String[] sa = suffix.split("/", 2); + if (Constants.ZTABLE_STATE.equals("/" + sa[1])) { + tableId = TableId.of(sa[0]); + } + } + if (tableId == null) { + log.trace("Unhandled path {}", event); + return; + } + } + + switch (zType) { + case NodeCreated: + case NodeDataChanged: + // state transition + if (tableId != null) { + stateChangeConsumer.accept(tableId, event); + } + break; + case NodeChildrenChanged: + // According to documentation we should not receive this event now + // that ZooCache is using Persistent Watchers. Not logging an error here. + // According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475 we + // may receive this event (Fixed in 3.9.0) + case NodeDeleted: + // ignore + case None: + // ignore + break; + default: + log.warn("Unhandled {}", event); + } + } +}