This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 05ff5e26e4 Manager can queue up tablets for TabletGroupWatcher (#3447)
05ff5e26e4 is described below

commit 05ff5e26e4420567b7983ea37f756e3d1f90d5af
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Jul 28 13:59:07 2023 -0400

    Manager can queue up tablets for TabletGroupWatcher (#3447)
    
    When the Manager performs a tablet management function
    it can now queue that tablet up for maintenance in the
    TabletGroupWatcher by calling
    TabletStateStore.getStoreForLevel().knownTabletStateChange(TabletManagement)
---
 .../core/metadata/schema/TabletsMetadata.java      |   4 +
 .../manager/state/LoggingTabletStateStore.java     |   5 +
 .../server/manager/state/MetaDataStateStore.java   |  11 +-
 .../server/manager/state/RootTabletStateStore.java |   3 +-
 .../manager/state/TabletManagementScanner.java     |  31 +++-
 .../server/manager/state/TabletStateStore.java     |   9 +
 .../server/manager/state/ZooTabletStateStore.java  |   7 +
 .../java/org/apache/accumulo/manager/Manager.java  |  30 +++-
 .../manager/ManagerClientServiceHandler.java       |  23 ++-
 .../test/functional/TabletManagementScannerIT.java | 189 +++++++++++++++++++++
 10 files changed, 296 insertions(+), 16 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index a3db02c2b2..ee517f27dc 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -71,6 +71,7 @@ import 
org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.Ample.ReadConsistency;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
@@ -297,6 +298,9 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
         fetchedCols.add(colToFetch);
 
         switch (colToFetch) {
+          case CHOPPED:
+            families.add(ChoppedColumnFamily.NAME);
+            break;
           case CLONED:
             families.add(ClonedColumnFamily.NAME);
             break;
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
index 9f45e36228..1fdd21b86a 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
@@ -58,6 +58,11 @@ class LoggingTabletStateStore implements TabletStateStore {
     return wrapped.iterator();
   }
 
+  @Override
+  public boolean addTabletStateChange(TabletManagement tablet) {
+    return this.wrapped.addTabletStateChange(tablet);
+  }
+
   @Override
   public void setFutureLocations(Collection<Assignment> assignments)
       throws DistributedStoreException {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
index dedffefd14..7cf77c311a 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.server.manager.state;
 
 import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
 
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.manager.state.TabletManagement;
@@ -36,6 +37,8 @@ class MetaDataStateStore extends AbstractTabletStateStore 
implements TabletState
   private final String targetTableName;
   private final Ample ample;
   private final DataLevel level;
+  protected final ArrayBlockingQueue<TabletManagement> knownStateChanges =
+      new ArrayBlockingQueue<>(1_000);
 
   protected MetaDataStateStore(DataLevel level, ClientContext context, 
CurrentState state,
       String targetTableName) {
@@ -58,7 +61,13 @@ class MetaDataStateStore extends AbstractTabletStateStore 
implements TabletState
 
   @Override
   public ClosableIterator<TabletManagement> iterator() {
-    return new TabletManagementScanner(context, TabletsSection.getRange(), 
state, targetTableName);
+    return new TabletManagementScanner(context, TabletsSection.getRange(), 
state, targetTableName,
+        knownStateChanges);
+  }
+
+  @Override
+  public boolean addTabletStateChange(TabletManagement tablet) {
+    return this.knownStateChanges.offer(tablet);
   }
 
   @Override
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
index 034940fb0b..e34048512a 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
@@ -32,7 +32,8 @@ class RootTabletStateStore extends MetaDataStateStore {
 
   @Override
   public ClosableIterator<TabletManagement> iterator() {
-    return new TabletManagementScanner(context, TabletsSection.getRange(), 
state, RootTable.NAME);
+    return new TabletManagementScanner(context, TabletsSection.getRange(), 
state, RootTable.NAME,
+        knownStateChanges);
   }
 
   @Override
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
index 619189babb..2ad43397d9 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
@@ -26,7 +26,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -58,9 +60,11 @@ public class TabletManagementScanner implements 
ClosableIterator<TabletManagemen
   private final BatchScanner mdScanner;
   private final Iterator<Entry<Key,Value>> iter;
   private final AtomicBoolean closed = new AtomicBoolean(false);
+  private final Queue<TabletManagement> knownTabletModifications;
 
-  TabletManagementScanner(ClientContext context, Range range, CurrentState 
state,
-      String tableName) {
+  // This constructor is called from TabletStateStore implementations
+  public TabletManagementScanner(ClientContext context, Range range, 
CurrentState state,
+      String tableName, Queue<TabletManagement> knownTabletModifications) {
     // scan over metadata table, looking for tablets in the wrong state based 
on the live servers
     // and online tables
     try {
@@ -93,10 +97,12 @@ public class TabletManagementScanner implements 
ClosableIterator<TabletManagemen
     TabletManagementIterator.configureScanner(mdScanner, state);
     mdScanner.setRanges(Collections.singletonList(range));
     iter = mdScanner.iterator();
+    this.knownTabletModifications = knownTabletModifications;
   }
 
+  // This constructor is called from utilities and tests
   public TabletManagementScanner(ClientContext context, Range range, String 
tableName) {
-    this(context, range, null, tableName);
+    this(context, range, null, tableName, new ArrayBlockingQueue<>(1_000));
   }
 
   @Override
@@ -114,6 +120,9 @@ public class TabletManagementScanner implements 
ClosableIterator<TabletManagemen
     if (closed.get()) {
       return false;
     }
+    if (!knownTabletModifications.isEmpty()) {
+      return true;
+    }
     boolean result = iter.hasNext();
     if (!result) {
       close();
@@ -126,12 +135,20 @@ public class TabletManagementScanner implements 
ClosableIterator<TabletManagemen
     if (closed.get()) {
       throw new NoSuchElementException(this.getClass().getSimpleName() + " is 
closed");
     }
+    if (!knownTabletModifications.isEmpty()) {
+      TabletManagement tm = knownTabletModifications.poll();
+      log.trace("Returning known tablet modification, extent: {}, hostingGoal: 
{}, actions: {}",
+          tm.getTabletMetadata().getExtent(), 
tm.getTabletMetadata().getHostingGoal(),
+          tm.getActions());
+      return tm;
+    }
     Entry<Key,Value> e = iter.next();
     try {
-      TabletManagement tmi = TabletManagementIterator.decode(e);
-      log.trace("Returning metadata tablet, extent: {}, hostingGoal: {}",
-          tmi.getTabletMetadata().getExtent(), 
tmi.getTabletMetadata().getHostingGoal());
-      return tmi;
+      TabletManagement tm = TabletManagementIterator.decode(e);
+      log.trace("Returning metadata tablet, extent: {}, hostingGoal: {}, 
actions: {}",
+          tm.getTabletMetadata().getExtent(), 
tm.getTabletMetadata().getHostingGoal(),
+          tm.getActions());
+      return tm;
     } catch (IOException e1) {
       throw new RuntimeException("Error creating TabletMetadata object", e1);
     }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
index ea058ab2dd..c44f01bd2d 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
@@ -52,6 +52,15 @@ public interface TabletStateStore extends 
Iterable<TabletManagement> {
   @Override
   ClosableIterator<TabletManagement> iterator();
 
+  /**
+   * Notify this TabletStateStore that a Tablet needs to be returned by the 
iterator() method so
+   * that the TabletGroupWatcher can perform the associated action on this 
Tablet.
+   *
+   * @param tablet TabletMetadata and Action that needs to be performed on it.
+   * @return true if tablet modification accepted for processing, false 
otherwise.
+   */
+  boolean addTabletStateChange(TabletManagement tablet);
+
   /**
    * Store the assigned locations in the data store.
    */
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
index 1c70c20ce6..cc6aff0519 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
@@ -97,6 +97,13 @@ class ZooTabletStateStore extends AbstractTabletStateStore 
implements TabletStat
     };
   }
 
+  @Override
+  public boolean addTabletStateChange(TabletManagement tablet) {
+    // This method does nothing, this TabletStateStore always returns
+    // the TabletManagement object for the Root Tablet.
+    return true;
+  }
+
   private static void validateAssignments(Collection<Assignment> assignments) {
     if (assignments.size() != 1) {
       throw new IllegalArgumentException("There is only one root tablet");
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0590a13e6e..78a769b5f9 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -238,6 +238,10 @@ public class Manager extends AbstractServer
   private final long timeToCacheRecoveryWalExistence;
   private ExecutorService tableInformationStatusPool = null;
 
+  private final TabletStateStore rootTabletStore;
+  private final TabletStateStore metadataTabletStore;
+  private final TabletStateStore userTabletStore;
+
   @Override
   public synchronized ManagerState getManagerState() {
     return state;
@@ -457,6 +461,10 @@ public class Manager extends AbstractServer
 
     final long tokenLifetime = 
aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME);
 
+    this.rootTabletStore = TabletStateStore.getStoreForLevel(DataLevel.ROOT, 
context, this);
+    this.metadataTabletStore = 
TabletStateStore.getStoreForLevel(DataLevel.METADATA, context, this);
+    this.userTabletStore = TabletStateStore.getStoreForLevel(DataLevel.USER, 
context, this);
+
     authenticationTokenKeyManager = null;
     keyDistributor = null;
     if (getConfiguration().getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
@@ -1270,8 +1278,7 @@ public class Manager extends AbstractServer
     this.splitter = new Splitter(context);
     this.splitter.start();
 
-    watchers.add(new TabletGroupWatcher(this,
-        TabletStateStore.getStoreForLevel(DataLevel.USER, context, this), 
null) {
+    watchers.add(new TabletGroupWatcher(this, this.userTabletStore, null) {
       @Override
       boolean canSuspendTablets() {
         // Always allow user data tablets to enter suspended state.
@@ -1279,8 +1286,7 @@ public class Manager extends AbstractServer
       }
     });
 
-    watchers.add(new TabletGroupWatcher(this,
-        TabletStateStore.getStoreForLevel(DataLevel.METADATA, context, this), 
watchers.get(0)) {
+    watchers.add(new TabletGroupWatcher(this, this.metadataTabletStore, 
watchers.get(0)) {
       @Override
       boolean canSuspendTablets() {
         // Allow metadata tablets to enter suspended state only if so 
configured. Generally
@@ -1291,8 +1297,7 @@ public class Manager extends AbstractServer
       }
     });
 
-    watchers.add(new TabletGroupWatcher(this,
-        TabletStateStore.getStoreForLevel(DataLevel.ROOT, context), 
watchers.get(1)) {
+    watchers.add(new TabletGroupWatcher(this, this.rootTabletStore, 
watchers.get(1)) {
       @Override
       boolean canSuspendTablets() {
         // Never allow root tablet to enter suspended state.
@@ -1906,4 +1911,17 @@ public class Manager extends AbstractServer
     tabletBalancer.getAssignments(params);
   }
 
+  public TabletStateStore getTabletStateStore(DataLevel level) {
+    switch (level) {
+      case METADATA:
+        return this.metadataTabletStore;
+      case ROOT:
+        return this.rootTabletStore;
+      case USER:
+        return this.userTabletStore;
+      default:
+        throw new IllegalStateException("Unhandled DataLevel value: " + level);
+    }
+  }
+
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
index 77241db239..77b058092f 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -58,6 +59,8 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.manager.state.TabletManagement;
+import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
 import org.apache.accumulo.core.manager.thrift.ManagerClientService;
 import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
 import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
@@ -70,8 +73,10 @@ import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.TabletDeletedException;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.securityImpl.thrift.TDelegationToken;
@@ -649,6 +654,7 @@ public class ManagerClientServiceHandler implements 
ManagerClientService.Iface {
     manager.mustBeOnline(tableId);
 
     log.info("Tablet hosting requested for: {} ", extents);
+    final List<KeyExtent> success = new ArrayList<>();
     final Ample ample = manager.getContext().getAmple();
     try (var mutator = ample.conditionallyMutateTablets()) {
       extents.forEach(e -> {
@@ -666,16 +672,31 @@ public class ManagerClientServiceHandler implements 
ManagerClientService.Iface {
       mutator.process().forEach((extent, result) -> {
         if (result.getStatus() == Status.ACCEPTED) {
           // cache this success for a bit
+          success.add(extent);
           recentHostingRequest.put(extent, System.currentTimeMillis());
         } else {
           if (log.isTraceEnabled()) {
-            // only read the metdata if the logging is enabled
+            // only read the metadata if the logging is enabled
             log.trace("Failed to set hosting request {}", 
result.readMetadata());
           }
         }
       });
     }
 
+    // Register the successful extent updates with the appropriate 
TabletStateStore.
+    // This will bump these tablets to the head of the line and the 
TabletGroupWatcher
+    // will process these updates before the ones returned from scanning the 
underlying
+    // table.
+    if (!success.isEmpty()) {
+      final Set<ManagementAction> actions = 
Set.of(ManagementAction.NEEDS_LOCATION_UPDATE);
+      ample.readTablets().forTablets(success, Optional.empty())
+          .fetch(TabletManagement.CONFIGURED_COLUMNS.toArray(new ColumnType[] 
{})).build()
+          .forEach(tm -> {
+            manager.getTabletStateStore(DataLevel.of(tm.getTableId()))
+                .addTabletStateChange(new TabletManagement(actions, tm));
+          });
+    }
+
     // this will kick the tablet group watcher into action
     manager.getEventCoordinator().event("Tablet hosting requested tableId:%s 
extents:%d", tableId,
         extents.size());
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementScannerIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementScannerIT.java
new file mode 100644
index 0000000000..11b4e58ac9
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementScannerIT.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.IntStream;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.Locations;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.manager.state.TabletManagement;
+import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.manager.state.TabletManagementScanner;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Iterators;
+
+public class TabletManagementScannerIT extends SharedMiniClusterBase {
+
+  public static class TMSIT_Config implements MiniClusterConfigurationCallback 
{
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
+      cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
+      cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "10");
+      cfg.setProperty(Property.GENERAL_THREADPOOL_SIZE, "10");
+      cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "180");
+      cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "10s");
+      cfg.setProperty(DefaultOnDemandTabletUnloader.INACTIVITY_THRESHOLD, 
"15");
+    }
+  }
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofMinutes(2);
+  }
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    SharedMiniClusterBase.startMiniClusterWithConfig(new TMSIT_Config());
+  }
+
+  @AfterAll
+  public static void afterAll() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Test
+  public void testKnownTabletModificationsTakePriority() throws Exception {
+
+    final ConcurrentLinkedQueue<TabletManagement> mods = new 
ConcurrentLinkedQueue<>();
+
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      // Confirm that the root and metadata tables are hosted
+      Locations rootLocations =
+          c.tableOperations().locate(RootTable.NAME, 
Collections.singletonList(new Range()));
+      rootLocations.groupByTablet().keySet()
+          .forEach(tid -> assertNotNull(rootLocations.getTabletLocation(tid)));
+
+      Locations metadataLocations =
+          c.tableOperations().locate(MetadataTable.NAME, 
Collections.singletonList(new Range()));
+      metadataLocations.groupByTablet().keySet()
+          .forEach(tid -> 
assertNotNull(metadataLocations.getTabletLocation(tid)));
+
+      String tableName = super.getUniqueNames(1)[0];
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      SortedSet<Text> splits = new TreeSet<>();
+      IntStream.range(97, 122).forEach((i) -> {
+        splits.add(new Text(String.valueOf((char) i)));
+      });
+      ntc.withSplits(splits);
+      c.tableOperations().create(tableName, ntc);
+
+      TableId tableId = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+      // wait for the tablets to exist in the metadata table. The tablets
+      // will not be hosted so the current location will be empty. Passing null
+      // to the TabletManagementScanner should cause all tablets to be returned
+      // by the TabletManagementIterator.
+      try (TabletManagementScanner s = new 
TabletManagementScanner((ClientContext) c,
+          new Range(TabletsSection.getRange(tableId)), null, 
MetadataTable.NAME, mods)) {
+        Wait.waitFor(() -> Iterators.size(s) == 26, 10_000, 250);
+      }
+
+      // set the tablet hosting goal on the first half of the tablets
+      // so they will need a location update action
+      c.tableOperations().setTabletHostingGoal(tableName, new Range(null, "m"),
+          TabletHostingGoal.ALWAYS);
+
+      // let's add a known modification for the last tablet
+      TabletsMetadata tms = 
getCluster().getServerContext().getAmple().readTablets()
+          .forTablet(new KeyExtent(tableId, null, new Text("y")))
+          .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION, ColumnType.SUSPEND, 
ColumnType.LOGS,
+              ColumnType.HOSTING_GOAL, ColumnType.HOSTING_REQUESTED, 
ColumnType.FILES,
+              ColumnType.LAST, ColumnType.OPID)
+          .build();
+      assertEquals(1, Iterators.size(tms.iterator()));
+      TabletMetadata tm = tms.iterator().next();
+      mods.add(new 
TabletManagement(EnumSet.of(ManagementAction.NEEDS_SPLITTING), tm));
+
+      // check the contents of the TabletManagementScanner
+      try (TabletManagementScanner s = new 
TabletManagementScanner((ClientContext) c,
+          new Range(TabletsSection.getRange(tableId)), null, 
MetadataTable.NAME, mods)) {
+
+        @SuppressWarnings("resource")
+        Iterator<TabletManagement> iter = s;
+        // Confirm that the first object is the one we added to the known mods 
set.
+        assertFalse(mods.isEmpty());
+        assertTrue(iter.hasNext());
+        TabletManagement tman = iter.next();
+        assertEquals(1, tman.getActions().size());
+        
assertTrue(tman.getActions().contains(ManagementAction.NEEDS_SPLITTING));
+        assertEquals(tm, tman.getTabletMetadata());
+
+        assertTrue(mods.isEmpty());
+
+        // The TabletManagementScanner is going to return all of the tablets 
because
+        // we passed null for the state. Confirm that the hosting goal is 
ALWAYS for
+        // for the first 13 and ondemand for the latter half. When running in 
the Manager
+        // the latter half of the results would not be returned.
+        for (int i = 0; i < 26; i++) {
+          assertTrue(iter.hasNext());
+          tman = iter.next();
+          assertEquals(1, tman.getActions().size());
+          
assertTrue(tman.getActions().contains(ManagementAction.NEEDS_LOCATION_UPDATE));
+          if (i < 13) {
+            assertEquals(TabletHostingGoal.ALWAYS, 
tman.getTabletMetadata().getHostingGoal());
+          } else {
+            assertEquals(TabletHostingGoal.ONDEMAND, 
tman.getTabletMetadata().getHostingGoal());
+          }
+        }
+
+        assertFalse(iter.hasNext());
+      }
+
+    }
+  }
+
+}

Reply via email to