This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 663369f517 Remove TableObserver and table state cache (#5422)
663369f517 is described below

commit 663369f5175218481456a541be5bd70ab8bc31b7
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Mon Mar 24 21:31:43 2025 -0400

    Remove TableObserver and table state cache (#5422)
    
    * Remove observer registry and table state cache from TableManager
    * Move TableStateWatcher to its own file
    * Remove TableObserver interface (only used by Manager) and replace with
      manager explicitly registering TableStateWatcher and its handler code
      with ZooCache
---
 .../accumulo/server/tables/TableManager.java       | 149 ++-------------------
 .../accumulo/server/tables/TableObserver.java      |  30 -----
 .../main/java/org/apache/accumulo/gc/GCRun.java    |   1 -
 .../java/org/apache/accumulo/manager/Manager.java  |  26 ++--
 .../apache/accumulo/manager/TableStateWatcher.java |  85 ++++++++++++
 5 files changed, 108 insertions(+), 183 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java 
b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
index adab9869f7..11b3002f7e 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
@@ -20,10 +20,7 @@ package org.apache.accumulo.server.tables;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -40,15 +37,12 @@ import 
org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.tables.TableNameUtil;
-import org.apache.accumulo.core.zookeeper.ZooCache.ZooCacheWatcher;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.conf.store.NamespacePropKey;
 import org.apache.accumulo.server.conf.store.PropStore;
 import org.apache.accumulo.server.conf.store.TablePropKey;
 import org.apache.accumulo.server.util.PropUtil;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,20 +53,12 @@ public class TableManager {
   private static final Logger log = 
LoggerFactory.getLogger(TableManager.class);
   private static final byte[] ZERO_BYTE = {'0'};
 
-  private final Set<TableObserver> observers = Collections.synchronizedSet(new 
HashSet<>());
-  private final Map<TableId,TableState> tableStateCache =
-      Collections.synchronizedMap(new HashMap<>());
-
   private final ServerContext context;
   private final ZooReaderWriter zoo;
 
   public TableManager(ServerContext context) {
     this.context = context;
     this.zoo = context.getZooSession().asReaderWriter();
-
-    // add our Watcher to the shared ZooCache
-    context.getZooCache().addZooCacheWatcher(new TableStateWatcher());
-    updateTableStateCache();
   }
 
   public void prepareNewNamespaceState(NamespaceId namespaceId, String 
namespace,
@@ -110,10 +96,6 @@ public class TableManager {
     }
   }
 
-  public TableState getTableState(TableId tableId) {
-    return tableStateCache.get(tableId);
-  }
-
   public synchronized void transitionTableState(final TableId tableId, final 
TableState newState,
       final EnumSet<TableState> expectedCurrStates) {
     Preconditions.checkArgument(newState != TableState.UNKNOWN);
@@ -161,40 +143,25 @@ public class TableManager {
     }
   }
 
-  private void updateTableStateCache() {
-    synchronized (tableStateCache) {
-      for (String tableId : 
context.getZooCache().getChildren(Constants.ZTABLES)) {
-        if (context.getZooCache().get(Constants.ZTABLES + "/" + tableId + 
Constants.ZTABLE_STATE)
-            != null) {
-          updateTableStateCache(TableId.of(tableId));
-        }
-      }
-    }
-  }
-
-  public TableState updateTableStateCache(TableId tableId) {
-    synchronized (tableStateCache) {
-      TableState tState = TableState.UNKNOWN;
-      byte[] data =
-          context.getZooCache().get(Constants.ZTABLES + "/" + tableId + 
Constants.ZTABLE_STATE);
-      if (data != null) {
-        String sState = new String(data, UTF_8);
-        try {
-          tState = TableState.valueOf(sState);
-        } catch (IllegalArgumentException e) {
-          log.error("Unrecognized state for table with tableId={}: {}", 
tableId, sState);
-        }
-        tableStateCache.put(tableId, tState);
+  public TableState getTableState(TableId tableId) {
+    TableState tState = TableState.UNKNOWN;
+    byte[] data =
+        context.getZooCache().get(Constants.ZTABLES + "/" + tableId + 
Constants.ZTABLE_STATE);
+    if (data != null) {
+      String sState = new String(data, UTF_8);
+      try {
+        tState = TableState.valueOf(sState);
+      } catch (IllegalArgumentException e) {
+        log.error("Unrecognized state for table with tableId={}: {}", tableId, 
sState);
       }
-      return tState;
     }
+    return tState;
   }
 
   public void addTable(TableId tableId, NamespaceId namespaceId, String 
tableName)
       throws KeeperException, InterruptedException, NamespaceNotFoundException 
{
     prepareNewTableState(tableId, namespaceId, tableName, TableState.NEW,
         NodeExistsPolicy.OVERWRITE);
-    updateTableStateCache(tableId);
   }
 
   public void cloneTable(TableId srcTableId, TableId tableId, String tableName,
@@ -209,100 +176,12 @@ public class TableManager {
 
     PropUtil.setProperties(context, TablePropKey.of(tableId), propertiesToSet);
     PropUtil.removeProperties(context, TablePropKey.of(tableId), 
propertiesToExclude);
-
-    updateTableStateCache(tableId);
   }
 
   public void removeTable(TableId tableId) throws KeeperException, 
InterruptedException {
-    synchronized (tableStateCache) {
-      tableStateCache.remove(tableId);
-      zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId + 
Constants.ZTABLE_STATE,
-          NodeMissingPolicy.SKIP);
-      zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId, 
NodeMissingPolicy.SKIP);
-    }
-  }
-
-  public boolean addObserver(TableObserver to) {
-    synchronized (observers) {
-      synchronized (tableStateCache) {
-        to.initialize();
-        return observers.add(to);
-      }
-    }
-  }
-
-  private class TableStateWatcher implements ZooCacheWatcher {
-    @Override
-    public void accept(WatchedEvent event) {
-      if (log.isTraceEnabled()) {
-        log.trace("{}", event);
-      }
-      final String zPath = event.getPath();
-      final EventType zType = event.getType();
-
-      TableId tableId = null;
-
-      if (zPath != null && zPath.startsWith(Constants.ZTABLES + "/")) {
-        String suffix = zPath.substring(Constants.ZTABLES.length() + 1);
-        if (suffix.contains("/")) {
-          String[] sa = suffix.split("/", 2);
-          if (Constants.ZTABLE_STATE.equals("/" + sa[1])) {
-            tableId = TableId.of(sa[0]);
-          }
-        }
-        if (tableId == null) {
-          log.trace("Unhandled path {}", event);
-          return;
-        }
-      }
-
-      switch (zType) {
-        case NodeChildrenChanged:
-          // According to documentation we should not receive this event now
-          // that ZooCache is using Persistent Watchers. Not logging an error 
here.
-          // According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475 
we
-          // may receive this event (Fixed in 3.9.0)
-          break;
-        case NodeCreated:
-        case NodeDataChanged:
-          // state transition
-          if (tableId != null) {
-            TableState tState = updateTableStateCache(tableId);
-            log.debug("State transition to {} @ {}", tState, event);
-            synchronized (observers) {
-              for (TableObserver to : observers) {
-                to.stateChanged(tableId, tState);
-              }
-            }
-          }
-          break;
-        case NodeDeleted:
-          if (zPath != null && tableId != null
-              && (zPath.equals(Constants.ZTABLES + "/" + tableId + 
Constants.ZTABLE_STATE)
-                  || zPath.equals(Constants.ZTABLES + "/" + tableId + 
Constants.ZCONFIG)
-                  || zPath.equals(Constants.ZTABLES + "/" + tableId + 
Constants.ZTABLE_NAME))) {
-            tableStateCache.remove(tableId);
-          }
-          break;
-        case None:
-          switch (event.getState()) {
-            case Expired:
-              log.trace("Session expired; {}", event);
-              synchronized (observers) {
-                for (TableObserver to : observers) {
-                  to.sessionExpired();
-                }
-              }
-              break;
-            case SyncConnected:
-            default:
-              log.trace("Ignored {}", event);
-          }
-          break;
-        default:
-          log.warn("Unhandled {}", event);
-      }
-    }
+    zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId + 
Constants.ZTABLE_STATE,
+        NodeMissingPolicy.SKIP);
+    zoo.recursiveDelete(Constants.ZTABLES + "/" + tableId, 
NodeMissingPolicy.SKIP);
   }
 
   public void removeNamespace(NamespaceId namespaceId)
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java
 
b/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java
deleted file mode 100644
index 6b20a3fb91..0000000000
--- 
a/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.server.tables;
-
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.manager.state.tables.TableState;
-
-public interface TableObserver {
-  void initialize();
-
-  void stateChanged(TableId tableId, TableState tState);
-
-  void sessionExpired();
-}
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
index 3a79855d8d..bb089c8885 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
@@ -341,7 +341,6 @@ public class GCRun implements GarbageCollectionEnvironment {
               if (parts.length > 2) {
                 TableId tableId = TableId.of(parts[1]);
                 String tabletDir = parts[2];
-                context.getTableManager().updateTableStateCache(tableId);
                 TableState tableState = 
context.getTableManager().getTableState(tableId);
                 if (tableState != null && tableState != TableState.DELETING) {
                   // clone directories don't always exist
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 94ac0e76b5..4c6d75755b 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -149,7 +149,6 @@ import 
org.apache.accumulo.server.security.SecurityOperation;
 import 
org.apache.accumulo.server.security.delegation.AuthenticationTokenKeyManager;
 import 
org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyDistributor;
 import org.apache.accumulo.server.tables.TableManager;
-import org.apache.accumulo.server.tables.TableObserver;
 import org.apache.accumulo.server.util.ScanServerMetadataEntries;
 import org.apache.accumulo.server.util.ServerBulkImportStatus;
 import org.apache.accumulo.server.util.TableInfoUtil;
@@ -179,7 +178,7 @@ import io.opentelemetry.context.Scope;
  * <p>
  * The manager will also coordinate log recoveries and reports general status.
  */
-public class Manager extends AbstractServer implements 
LiveTServerSet.Listener, TableObserver {
+public class Manager extends AbstractServer implements LiveTServerSet.Listener 
{
 
   static final Logger log = LoggerFactory.getLogger(Manager.class);
 
@@ -1174,7 +1173,14 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
 
     recoveryManager = new RecoveryManager(this, 
timeToCacheRecoveryWalExistence);
 
-    context.getTableManager().addObserver(this);
+    context.getZooCache().addZooCacheWatcher(new TableStateWatcher((tableId, 
event) -> {
+      TableState state = getTableManager().getTableState(tableId);
+      log.debug("Table state transition to {} @ {}", state, event);
+      nextEvent.event(tableId, "Table state in zookeeper changed for %s to 
%s", tableId, state);
+      if (state == TableState.OFFLINE) {
+        clearMigrations(tableId);
+      }
+    }));
 
     tableInformationStatusPool = ThreadPools.getServerThreadPools()
         .createExecutorService(getConfiguration(), 
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
@@ -1709,20 +1715,6 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
     }
   }
 
-  @Override
-  public void stateChanged(TableId tableId, TableState state) {
-    nextEvent.event(tableId, "Table state in zookeeper changed for %s to %s", 
tableId, state);
-    if (state == TableState.OFFLINE) {
-      clearMigrations(tableId);
-    }
-  }
-
-  @Override
-  public void initialize() {}
-
-  @Override
-  public void sessionExpired() {}
-
   public Set<TableId> onlineTables() {
     Set<TableId> result = new HashSet<>();
     if (getManagerState() != ManagerState.NORMAL) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TableStateWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TableStateWatcher.java
new file mode 100644
index 0000000000..5b53e03d6c
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TableStateWatcher.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager;
+
+import java.util.function.BiConsumer;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.zookeeper.ZooCache.ZooCacheWatcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class TableStateWatcher implements ZooCacheWatcher {
+
+  private final BiConsumer<TableId,WatchedEvent> stateChangeConsumer;
+
+  TableStateWatcher(BiConsumer<TableId,WatchedEvent> stateChangeConsumer) {
+    this.stateChangeConsumer = stateChangeConsumer;
+  }
+
+  private static final Logger log = 
LoggerFactory.getLogger(TableStateWatcher.class);
+
+  @Override
+  public void accept(WatchedEvent event) {
+    log.trace("{}", event);
+    final String zPath = event.getPath();
+    final EventType zType = event.getType();
+
+    TableId tableId = null;
+
+    if (zPath != null && zPath.startsWith(Constants.ZTABLES + "/")) {
+      String suffix = zPath.substring(Constants.ZTABLES.length() + 1);
+      if (suffix.contains("/")) {
+        String[] sa = suffix.split("/", 2);
+        if (Constants.ZTABLE_STATE.equals("/" + sa[1])) {
+          tableId = TableId.of(sa[0]);
+        }
+      }
+      if (tableId == null) {
+        log.trace("Unhandled path {}", event);
+        return;
+      }
+    }
+
+    switch (zType) {
+      case NodeCreated:
+      case NodeDataChanged:
+        // state transition
+        if (tableId != null) {
+          stateChangeConsumer.accept(tableId, event);
+        }
+        break;
+      case NodeChildrenChanged:
+        // According to documentation we should not receive this event now
+        // that ZooCache is using Persistent Watchers. Not logging an error 
here.
+        // According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475 we
+        // may receive this event (Fixed in 3.9.0)
+      case NodeDeleted:
+        // ignore
+      case None:
+        // ignore
+        break;
+      default:
+        log.warn("Unhandled {}", event);
+    }
+  }
+}

Reply via email to