This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new bb9a967f56 Cleans up non-existent migrating metadata tablets in 
manager (#4750)
bb9a967f56 is described below

commit bb9a967f5608986ef92d1299426c437750dfb7aa
Author: Keith Turner <[email protected]>
AuthorDate: Wed Jul 24 12:06:11 2024 -0700

    Cleans up non-existent migrating metadata tablets in manager (#4750)
    
    Fixes #4475
---
 .../java/org/apache/accumulo/manager/Manager.java  | 82 ++++++++++++----------
 1 file changed, 45 insertions(+), 37 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index d61c43a491..514c0271d4 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -32,6 +32,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -54,18 +55,14 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.InstanceId;
-import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.AgeOffStore;
 import org.apache.accumulo.core.fate.Fate;
@@ -96,12 +93,10 @@ import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletLocationState;
 import org.apache.accumulo.core.metadata.TabletState;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metrics.MetricsInfo;
 import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
-import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.spi.balancer.BalancerEnvironment;
 import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer;
 import org.apache.accumulo.core.spi.balancer.TabletBalancer;
@@ -712,21 +707,34 @@ public class Manager extends AbstractServer
      * migration will refer to a non-existing tablet, so it can never 
complete. Periodically scan
      * the metadata table and remove any migrating tablets that no longer 
exist.
      */
-    private void cleanupNonexistentMigrations(final AccumuloClient 
accumuloClient)
-        throws TableNotFoundException {
-      Scanner scanner = accumuloClient.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY);
-      TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
-      scanner.setRange(MetadataSchema.TabletsSection.getRange());
-      Set<KeyExtent> notSeen;
+    private void cleanupNonexistentMigrations(final ClientContext 
clientContext) {
+
+      Map<DataLevel,Set<KeyExtent>> notSeen;
+
       synchronized (migrations) {
-        notSeen = new HashSet<>(migrations.keySet());
+        notSeen = partitionMigrations(migrations.keySet());
       }
-      for (Entry<Key,Value> entry : scanner) {
-        KeyExtent extent = KeyExtent.fromMetaPrevRow(entry);
-        notSeen.remove(extent);
+
+      // for each level find the set of migrating tablets that do not exists 
in metadata store
+      for (DataLevel dataLevel : DataLevel.values()) {
+        var notSeenForLevel = notSeen.getOrDefault(dataLevel, Set.of());
+        if (notSeenForLevel.isEmpty() || dataLevel == DataLevel.ROOT) {
+          // No need to scan this level if there are no migrations. The root 
tablet is always
+          // expected to exists, so no need to read its metadata.
+          continue;
+        }
+
+        try (var tablets = 
clientContext.getAmple().readTablets().forLevel(dataLevel)
+            .fetch(TabletMetadata.ColumnType.PREV_ROW).build()) {
+          // A goal of this code is to avoid reading all extents in the 
metadata table into memory
+          // when finding extents that exists in the migrating set and not in 
the metadata table.
+          tablets.forEach(tabletMeta -> 
notSeenForLevel.remove(tabletMeta.getExtent()));
+        }
+
+        // remove any tablets that previously existed in migrations for this 
level but were not seen
+        // in the metadata table for the level
+        migrations.keySet().removeAll(notSeenForLevel);
       }
-      // remove tablets that used to be in migrations and were not seen in the 
metadata table
-      migrations.keySet().removeAll(notSeen);
     }
 
     /**
@@ -787,6 +795,23 @@ public class Manager extends AbstractServer
 
   }
 
+  /**
+   * balanceTablets() balances tables by DataLevel. Return the current set of 
migrations partitioned
+   * by DataLevel
+   */
+  private static Map<DataLevel,Set<KeyExtent>>
+      partitionMigrations(final Set<KeyExtent> migrations) {
+    final Map<DataLevel,Set<KeyExtent>> partitionedMigrations = new 
EnumMap<>(DataLevel.class);
+    // populate to prevent NPE
+    for (DataLevel dl : DataLevel.values()) {
+      partitionedMigrations.put(dl, new HashSet<>());
+    }
+    migrations.forEach(ke -> {
+      partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke);
+    });
+    return partitionedMigrations;
+  }
+
   private class StatusThread implements Runnable {
 
     private boolean goodStats() {
@@ -957,23 +982,6 @@ public class Manager extends AbstractServer
       }
     }
 
-    /**
-     * balanceTablets() balances tables by DataLevel. Return the current set 
of migrations
-     * partitioned by DataLevel
-     */
-    private Map<DataLevel,Set<KeyExtent>> partitionMigrations(final 
Set<KeyExtent> migrations) {
-      final Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
-          new HashMap<>(DataLevel.values().length);
-      // populate to prevent NPE
-      for (DataLevel dl : DataLevel.values()) {
-        partitionedMigrations.put(dl, new HashSet<>());
-      }
-      migrations.forEach(ke -> {
-        partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke);
-      });
-      return partitionedMigrations;
-    }
-
     /**
      * Given the current tserverStatus map and a DataLevel, return a view of 
the tserverStatus map
      * that only contains entries for tables in the DataLevel

Reply via email to