This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 4e06b2f054 Avoids full table scans in TabletGroupWatcher in some 
situations. (#3705)
4e06b2f054 is described below

commit 4e06b2f05423469bab2c6d7b4ed27f147c5ead3c
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Oct 6 16:17:03 2023 -0400

    Avoids full table scans in TabletGroupWatcher in some situations. (#3705)
    
    The manager has an internal class called EventCoordinator that is used
    to signal that something happened in the manager.  When anything
    signaled an event happened this would cause the tablet group watcher to
    scan the entire metadata table.  This commit makes a few changes to
    improve this.
    
    First it added the ability to EventCoordinator to signal with more
    specific information, like a something happened with a table or a
    tablet.
    
    Second the TabletGroupWatcher was adapted to react to this more specific
    signaling with narrower scans of the metadata table.  If a signal is
    made that something changed with a table, then only that tables metadata
    will be scanned.  The TabletGroupWatcher will still do full scans
    periodically (based on configuration) or when a something signals a full
    scan is needed.
    
    A new IT was added that creates a 1 million splits in a table and then
    reads and writes to around 100 random tablets.  The tablets are ondemand
    so must be brought online for the read and write.  If doing bringing
    them onling takes too long (because a full tables scan is done) then the
    test might timeout and fail.
---
 .../apache/accumulo/core/logging/TabletLogger.java |   3 +-
 ...TabletStateStore.java => ClosableIterable.java} |  23 +-
 .../manager/state/LoggingTabletStateStore.java     |  10 +-
 .../server/manager/state/MetaDataStateStore.java   |  16 +-
 .../server/manager/state/RootTabletStateStore.java |   9 +-
 .../manager/state/TabletManagementIterator.java    |   7 +-
 .../manager/state/TabletManagementScanner.java     |  25 +-
 .../server/manager/state/TabletStateStore.java     |  21 +-
 .../server/manager/state/ZooTabletStateStore.java  |  10 +-
 .../apache/accumulo/manager/EventCoordinator.java  | 123 +++-
 .../java/org/apache/accumulo/manager/Manager.java  |  17 +-
 .../manager/ManagerClientServiceHandler.java       |  32 +-
 .../accumulo/manager/TabletGroupWatcher.java       | 657 +++++++++++++--------
 .../coordinator/CompactionCoordinator.java         |   9 +-
 .../manager/tableOps/ChangeTableState.java         |   2 +-
 .../manager/tableOps/clone/FinishCloneTable.java   |   4 +-
 .../manager/tableOps/compact/CompactionDriver.java |  19 +-
 .../manager/tableOps/create/FinishCreateTable.java |   3 +-
 .../manager/tableOps/delete/DeleteTable.java       |   2 +-
 .../manager/tableOps/split/DeleteOperationIds.java |   4 +
 .../tableOps/tableImport/FinishImportTable.java    |   2 +-
 .../accumulo/test/functional/SplitMillionIT.java   | 132 +++++
 .../test/functional/TabletManagementScannerIT.java | 189 ------
 23 files changed, 740 insertions(+), 579 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
index e737805761..8295b2c917 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
@@ -93,7 +93,8 @@ public class TabletLogger {
   }
 
   public static void split(KeyExtent parent, SortedSet<Text> splits) {
-    locLog.debug("Split {} into {}", parent, splits);
+    locLog.debug("Split {} into {} tablets", parent, splits.size() + 1);
+    locLog.trace("Split {} into {}", parent, splits);
   }
 
   /**
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ClosableIterable.java
similarity index 52%
copy from 
server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
copy to 
server/base/src/main/java/org/apache/accumulo/server/manager/state/ClosableIterable.java
index e34048512a..fcf5f8de41 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ClosableIterable.java
@@ -18,26 +18,7 @@
  */
 package org.apache.accumulo.server.manager.state;
 
-import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.manager.state.TabletManagement;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-
-class RootTabletStateStore extends MetaDataStateStore {
-
-  RootTabletStateStore(DataLevel level, ClientContext context, CurrentState 
state) {
-    super(level, context, state, RootTable.NAME);
-  }
-
-  @Override
-  public ClosableIterator<TabletManagement> iterator() {
-    return new TabletManagementScanner(context, TabletsSection.getRange(), 
state, RootTable.NAME,
-        knownStateChanges);
-  }
-
+public interface ClosableIterable<T> extends Iterable<T> {
   @Override
-  public String name() {
-    return "Metadata Tablets";
-  }
+  ClosableIterator<T> iterator();
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
index 1fdd21b86a..ce2b3ede02 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.logging.TabletLogger;
 import org.apache.accumulo.core.manager.state.TabletManagement;
 import org.apache.accumulo.core.metadata.TServerInstance;
@@ -54,13 +55,8 @@ class LoggingTabletStateStore implements TabletStateStore {
   }
 
   @Override
-  public ClosableIterator<TabletManagement> iterator() {
-    return wrapped.iterator();
-  }
-
-  @Override
-  public boolean addTabletStateChange(TabletManagement tablet) {
-    return this.wrapped.addTabletStateChange(tablet);
+  public ClosableIterator<TabletManagement> iterator(List<Range> ranges) {
+    return wrapped.iterator(ranges);
   }
 
   @Override
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
index 7cf77c311a..d69c114f5b 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
@@ -19,15 +19,15 @@
 package org.apache.accumulo.server.manager.state;
 
 import java.util.Collection;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.List;
 
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.manager.state.TabletManagement;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 
 class MetaDataStateStore extends AbstractTabletStateStore implements 
TabletStateStore {
@@ -37,8 +37,6 @@ class MetaDataStateStore extends AbstractTabletStateStore 
implements TabletState
   private final String targetTableName;
   private final Ample ample;
   private final DataLevel level;
-  protected final ArrayBlockingQueue<TabletManagement> knownStateChanges =
-      new ArrayBlockingQueue<>(1_000);
 
   protected MetaDataStateStore(DataLevel level, ClientContext context, 
CurrentState state,
       String targetTableName) {
@@ -60,14 +58,8 @@ class MetaDataStateStore extends AbstractTabletStateStore 
implements TabletState
   }
 
   @Override
-  public ClosableIterator<TabletManagement> iterator() {
-    return new TabletManagementScanner(context, TabletsSection.getRange(), 
state, targetTableName,
-        knownStateChanges);
-  }
-
-  @Override
-  public boolean addTabletStateChange(TabletManagement tablet) {
-    return this.knownStateChanges.offer(tablet);
+  public ClosableIterator<TabletManagement> iterator(List<Range> ranges) {
+    return new TabletManagementScanner(context, ranges, state, 
targetTableName);
   }
 
   @Override
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
index e34048512a..4b574ade13 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
@@ -18,11 +18,13 @@
  */
 package org.apache.accumulo.server.manager.state;
 
+import java.util.List;
+
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.manager.state.TabletManagement;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 
 class RootTabletStateStore extends MetaDataStateStore {
 
@@ -31,9 +33,8 @@ class RootTabletStateStore extends MetaDataStateStore {
   }
 
   @Override
-  public ClosableIterator<TabletManagement> iterator() {
-    return new TabletManagementScanner(context, TabletsSection.getRange(), 
state, RootTable.NAME,
-        knownStateChanges);
+  public ClosableIterator<TabletManagement> iterator(List<Range> ranges) {
+    return new TabletManagementScanner(context, ranges, state, RootTable.NAME);
   }
 
   @Override
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
index ea50b01662..e274d275a7 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
@@ -275,7 +275,7 @@ public class TabletManagementIterator extends 
SkippingIterator {
     final long sumOfFileSizes =
         
tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum();
     final boolean shouldSplit = sumOfFileSizes > splitThreshold;
-    LOG.debug("{} should split? sum: {}, threshold: {}, result: {}", 
tm.getExtent(), sumOfFileSizes,
+    LOG.trace("{} should split? sum: {}, threshold: {}, result: {}", 
tm.getExtent(), sumOfFileSizes,
         splitThreshold, shouldSplit);
     return shouldSplit;
   }
@@ -292,8 +292,8 @@ public class TabletManagementIterator extends 
SkippingIterator {
         onlineTables.contains(tm.getTableId()) && tm.getOperationId() == null;
 
     TabletState state = TabletState.compute(tm, current, balancer, 
tserverResourceGroups);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("{} is {}. Table is {}line. Tablet hosting goal is {}, 
hostingRequested: {}",
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} is {}. Table is {}line. Tablet hosting goal is {}, 
hostingRequested: {}",
           tm.getExtent(), state, (shouldBeOnline ? "on" : "off"), 
tm.getHostingGoal(),
           tm.getHostingRequested());
     }
@@ -402,7 +402,6 @@ public class TabletManagementIterator extends 
SkippingIterator {
     balancer = Property.createInstanceFromPropertyName(conf, 
Property.MANAGER_TABLET_BALANCER,
         TabletBalancer.class, new SimpleLoadBalancer());
     balancer.init(benv);
-    LOG.debug("Balancer is set to {}", balancer.getClass().getSimpleName());
   }
 
   @Override
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
index 2ad43397d9..253d6e4f45 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java
@@ -20,15 +20,12 @@ package org.apache.accumulo.server.manager.state;
 
 import java.io.IOException;
 import java.lang.ref.Cleaner.Cleanable;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -60,11 +57,10 @@ public class TabletManagementScanner implements 
ClosableIterator<TabletManagemen
   private final BatchScanner mdScanner;
   private final Iterator<Entry<Key,Value>> iter;
   private final AtomicBoolean closed = new AtomicBoolean(false);
-  private final Queue<TabletManagement> knownTabletModifications;
 
   // This constructor is called from TabletStateStore implementations
-  public TabletManagementScanner(ClientContext context, Range range, 
CurrentState state,
-      String tableName, Queue<TabletManagement> knownTabletModifications) {
+  public TabletManagementScanner(ClientContext context, List<Range> ranges, 
CurrentState state,
+      String tableName) {
     // scan over metadata table, looking for tablets in the wrong state based 
on the live servers
     // and online tables
     try {
@@ -95,14 +91,13 @@ public class TabletManagementScanner implements 
ClosableIterator<TabletManagemen
     }
     cleanable = CleanerUtil.unclosed(this, TabletManagementScanner.class, 
closed, log, mdScanner);
     TabletManagementIterator.configureScanner(mdScanner, state);
-    mdScanner.setRanges(Collections.singletonList(range));
+    mdScanner.setRanges(ranges);
     iter = mdScanner.iterator();
-    this.knownTabletModifications = knownTabletModifications;
   }
 
   // This constructor is called from utilities and tests
   public TabletManagementScanner(ClientContext context, Range range, String 
tableName) {
-    this(context, range, null, tableName, new ArrayBlockingQueue<>(1_000));
+    this(context, List.of(range), null, tableName);
   }
 
   @Override
@@ -120,9 +115,7 @@ public class TabletManagementScanner implements 
ClosableIterator<TabletManagemen
     if (closed.get()) {
       return false;
     }
-    if (!knownTabletModifications.isEmpty()) {
-      return true;
-    }
+
     boolean result = iter.hasNext();
     if (!result) {
       close();
@@ -135,13 +128,7 @@ public class TabletManagementScanner implements 
ClosableIterator<TabletManagemen
     if (closed.get()) {
       throw new NoSuchElementException(this.getClass().getSimpleName() + " is 
closed");
     }
-    if (!knownTabletModifications.isEmpty()) {
-      TabletManagement tm = knownTabletModifications.poll();
-      log.trace("Returning known tablet modification, extent: {}, hostingGoal: 
{}, actions: {}",
-          tm.getTabletMetadata().getExtent(), 
tm.getTabletMetadata().getHostingGoal(),
-          tm.getActions());
-      return tm;
-    }
+
     Entry<Key,Value> e = iter.next();
     try {
       TabletManagement tm = TabletManagementIterator.decode(e);
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
index c44f01bd2d..19cc5b5832 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
@@ -23,10 +23,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.manager.state.TabletManagement;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.fs.Path;
@@ -34,7 +36,7 @@ import org.apache.hadoop.fs.Path;
 /**
  * Interface for storing information about tablet assignments. There are three 
implementations:
  */
-public interface TabletStateStore extends Iterable<TabletManagement> {
+public interface TabletStateStore extends ClosableIterable<TabletManagement> {
 
   /**
    * Get the level for this state store
@@ -47,19 +49,18 @@ public interface TabletStateStore extends 
Iterable<TabletManagement> {
   String name();
 
   /**
-   * Scan the information about the tablets covered by this store
+   * Scan the information about the tablets covered by this store that have 
end row in the specified
+   * ranges.
    */
-  @Override
-  ClosableIterator<TabletManagement> iterator();
+  ClosableIterator<TabletManagement> iterator(List<Range> ranges);
 
   /**
-   * Notify this TabletStateStore that a Tablet needs to be returned by the 
iterator() method so
-   * that the TabletGroupWatcher can perform the associated action on this 
Tablet.
-   *
-   * @param tablet TabletMetadata and Action that needs to be performed on it.
-   * @return true if tablet modification accepted for processing, false 
otherwise.
+   * Scan the information about all tablets covered by this store..
    */
-  boolean addTabletStateChange(TabletManagement tablet);
+  @Override
+  default ClosableIterator<TabletManagement> iterator() {
+    return iterator(List.of(MetadataSchema.TabletsSection.getRange()));
+  }
 
   /**
    * Store the assigned locations in the data store.
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
index cc6aff0519..7fa5158da5 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
@@ -23,6 +23,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.manager.state.TabletManagement;
 import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
 import org.apache.accumulo.core.metadata.RootTable;
@@ -59,7 +60,7 @@ class ZooTabletStateStore extends AbstractTabletStateStore 
implements TabletStat
   }
 
   @Override
-  public ClosableIterator<TabletManagement> iterator() {
+  public ClosableIterator<TabletManagement> iterator(List<Range> ranges) {
 
     return new ClosableIterator<>() {
       boolean finished = false;
@@ -97,13 +98,6 @@ class ZooTabletStateStore extends AbstractTabletStateStore 
implements TabletStat
     };
   }
 
-  @Override
-  public boolean addTabletStateChange(TabletManagement tablet) {
-    // This method does nothing, this TabletStateStore always returns
-    // the TabletManagement object for the Root Tablet.
-    return true;
-  }
-
   private static void validateAssignments(Collection<Assignment> assignments) {
     if (assignments.size() != 1) {
       throw new IllegalArgumentException("There is only one root tablet");
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java
index 4145f58189..649f259fa1 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java
@@ -18,13 +18,23 @@
  */
 package org.apache.accumulo.manager;
 
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 public class EventCoordinator {
 
   private static final Logger log = 
LoggerFactory.getLogger(EventCoordinator.class);
-  long eventCounter = 0;
+
+  private long eventCounter = 0;
 
   synchronized long waitForEvents(long millis, long lastEvent) {
     // Did something happen since the last time we waited?
@@ -42,20 +52,121 @@ public class EventCoordinator {
     return eventCounter;
   }
 
-  public synchronized void event(String msg, Object... args) {
+  private final Map<Ample.DataLevel,Listener> listeners = new 
EnumMap<>(Ample.DataLevel.class);
+
+  public enum EventScope {
+    ALL, DATA_LEVEL, TABLE, TABLE_RANGE
+  }
+
+  public static class Event {
+
+    private final EventScope scope;
+    private final Ample.DataLevel level;
+    private final KeyExtent extent;
+
+    Event(EventScope scope, KeyExtent extent) {
+      this.scope = scope;
+      this.level = Ample.DataLevel.of(extent.tableId());
+      this.extent = extent;
+    }
+
+    Event(EventScope scope, TableId tableId) {
+      this.scope = scope;
+      this.level = Ample.DataLevel.of(tableId);
+      this.extent = new KeyExtent(tableId, null, null);
+    }
+
+    Event(EventScope scope, Ample.DataLevel level) {
+      this.scope = scope;
+      this.level = level;
+      this.extent = null;
+    }
+
+    Event() {
+      this.scope = EventScope.ALL;
+      this.level = null;
+      this.extent = null;
+    }
+
+    public EventScope getScope() {
+      return scope;
+    }
+
+    public Ample.DataLevel getLevel() {
+      Preconditions.checkState(scope != EventScope.ALL);
+      return level;
+    }
+
+    public TableId getTableId() {
+      Preconditions.checkState(scope == EventScope.TABLE || scope == 
EventScope.TABLE_RANGE);
+      return extent.tableId();
+    }
+
+    public KeyExtent getExtent() {
+      Preconditions.checkState(scope == EventScope.TABLE || scope == 
EventScope.TABLE_RANGE);
+      return extent;
+    }
+  }
+
+  public void event(String msg, Object... args) {
+    log.info(String.format(msg, args));
+    publish(new Event());
+  }
+
+  public void event(Ample.DataLevel level, String msg, Object... args) {
     log.info(String.format(msg, args));
+    publish(new Event(EventScope.DATA_LEVEL, level));
+  }
+
+  public void event(TableId tableId, String msg, Object... args) {
+    log.info(String.format(msg, args));
+    publish(new Event(EventScope.TABLE, tableId));
+  }
+
+  public void event(KeyExtent extent, String msg, Object... args) {
+    log.debug(String.format(msg, args));
+    publish(new Event(EventScope.TABLE_RANGE, extent));
+  }
+
+  public void event(Collection<KeyExtent> extents, String msg, Object... args) 
{
+    if (!extents.isEmpty()) {
+      log.debug(String.format(msg, args));
+      extents.forEach(extent -> publish(new Event(EventScope.TABLE_RANGE, 
extent)));
+    }
+  }
+
+  private synchronized void publish(Event event) {
+    if (event.getScope() == EventScope.ALL) {
+      listeners.values().forEach(listener -> listener.process(event));
+    } else {
+      listeners.getOrDefault(event.getLevel(), e -> {}).process(event);
+    }
+
     eventCounter++;
     notifyAll();
   }
 
-  public Listener getListener() {
-    return new Listener();
+  public interface Listener {
+    void process(Event event);
+  }
+
+  public synchronized void addListener(Ample.DataLevel level, Listener 
listener) {
+    // Currently only expecting one listener for each level, so keeping the 
code simple and
+    // detecting deviations. Can adept if needed.
+    Preconditions.checkState(listeners.put(level, listener) == null);
+  }
+
+  public Tracker getTracker() {
+    return new Tracker();
   }
 
-  public class Listener {
+  /**
+   * Tracks the event counter and helps detect changes in it.
+   */
+  public class Tracker {
     long lastEvent;
 
-    Listener() {
+    Tracker() {
       lastEvent = eventCounter;
     }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0f539e199f..4fbe6bc407 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -543,7 +543,8 @@ public class Manager extends AbstractServer
       }
       mergeLock.notifyAll();
     }
-    nextEvent.event("Merge state of %s set to %s", info.getExtent(), state);
+    nextEvent.event(info.getExtent().tableId(), "Merge state of %s set to %s", 
info.getExtent(),
+        state);
   }
 
   public void clearMergeState(TableId tableId) throws KeeperException, 
InterruptedException {
@@ -552,7 +553,7 @@ public class Manager extends AbstractServer
       getContext().getZooReaderWriter().recursiveDelete(path, 
NodeMissingPolicy.SKIP);
       mergeLock.notifyAll();
     }
-    nextEvent.event("Merge state of %s cleared", tableId);
+    nextEvent.event(tableId, "Merge state of %s cleared", tableId);
   }
 
   void setManagerGoalState(ManagerGoalState state) {
@@ -614,7 +615,7 @@ public class Manager extends AbstractServer
       return txids;
     });
 
-    nextEvent.event("Unassignment requested %s", tablet);
+    nextEvent.event(tablet, "Unassignment requested %s", tablet);
   }
 
   public void cancelUnassignmentRequest(KeyExtent tablet, long fateTxid) {
@@ -627,7 +628,7 @@ public class Manager extends AbstractServer
       return v;
     });
 
-    nextEvent.event("Unassignment request canceled %s", tablet);
+    nextEvent.event(tablet, "Unassignment request canceled %s", tablet);
   }
 
   public boolean isUnassignmentRequested(KeyExtent extent) {
@@ -856,7 +857,7 @@ public class Manager extends AbstractServer
 
     @Override
     public void run() {
-      EventCoordinator.Listener eventListener = nextEvent.getListener();
+      EventCoordinator.Tracker eventTracker = nextEvent.getTracker();
       while (stillManager()) {
         long wait = DEFAULT_WAIT_FOR_WATCHER;
         try {
@@ -935,7 +936,7 @@ public class Manager extends AbstractServer
         Span span = TraceUtil.startSpan(this.getClass(), "run::updateStatus");
         try (Scope scope = span.makeCurrent()) {
           wait = updateStatus();
-          eventListener.waitForEvents(wait);
+          eventTracker.waitForEvents(wait);
         } catch (Exception t) {
           TraceUtil.setException(span, t, false);
           log.error("Error balancing tablets, will wait for {} (seconds) and 
then retry ",
@@ -1172,7 +1173,7 @@ public class Manager extends AbstractServer
     fateServiceHandler = new FateServiceHandler(this);
     managerClientHandler = new ManagerClientServiceHandler(this);
     compactionCoordinator =
-        new CompactionCoordinator(context, tserverSet, security, 
compactionJobQueues);
+        new CompactionCoordinator(context, tserverSet, security, 
compactionJobQueues, nextEvent);
     // Start the Manager's Client service
     // Ensure that calls before the manager gets the lock fail
     ManagerClientService.Iface haProxy =
@@ -1697,7 +1698,7 @@ public class Manager extends AbstractServer
 
   @Override
   public void stateChanged(TableId tableId, TableState state) {
-    nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, 
state);
+    nextEvent.event(tableId, "Table state in zookeeper changed for %s to %s", 
tableId, state);
     if (state == TableState.OFFLINE) {
       clearMigrations(tableId);
     }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
index 89e73c038a..37b6bc9456 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
@@ -32,7 +32,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -59,8 +58,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.core.manager.state.TabletManagement;
-import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
 import org.apache.accumulo.core.manager.thrift.ManagerClientService;
 import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
 import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
@@ -72,10 +69,8 @@ import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
-import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.TabletDeletedException;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.securityImpl.thrift.TDelegationToken;
@@ -306,9 +301,9 @@ public class ManagerClientServiceHandler implements 
ManagerClientService.Iface {
     }
     if (stopTabletServers) {
       manager.setManagerGoalState(ManagerGoalState.CLEAN_STOP);
-      EventCoordinator.Listener eventListener = 
manager.nextEvent.getListener();
+      EventCoordinator.Tracker eventTracker = manager.nextEvent.getTracker();
       do {
-        eventListener.waitForEvents(Manager.ONE_SECOND);
+        eventTracker.waitForEvents(Manager.ONE_SECOND);
       } while (manager.tserverSet.size() > 0);
     }
     manager.setManagerState(ManagerState.STOP);
@@ -362,10 +357,10 @@ public class ManagerClientServiceHandler implements 
ManagerClientService.Iface {
         Manager.log.error("{} reports assignment failed for tablet {}", 
serverName, tablet);
         break;
       case LOADED:
-        manager.nextEvent.event("tablet %s was loaded on %s", tablet, 
serverName);
+        manager.nextEvent.event(tablet, "tablet %s was loaded on %s", tablet, 
serverName);
         break;
       case UNLOADED:
-        manager.nextEvent.event("tablet %s was unloaded from %s", tablet, 
serverName);
+        manager.nextEvent.event(tablet, "tablet %s was unloaded from %s", 
tablet, serverName);
         break;
       case UNLOAD_ERROR:
         Manager.log.error("{} reports unload failed for tablet {}", 
serverName, tablet);
@@ -656,23 +651,8 @@ public class ManagerClientServiceHandler implements 
ManagerClientService.Iface {
       });
     }
 
-    // Register the successful extent updates with the appropriate 
TabletStateStore.
-    // This will bump these tablets to the head of the line and the 
TabletGroupWatcher
-    // will process these updates before the ones returned from scanning the 
underlying
-    // table.
-    if (!success.isEmpty()) {
-      final Set<ManagementAction> actions = 
Set.of(ManagementAction.NEEDS_LOCATION_UPDATE);
-      ample.readTablets().forTablets(success, Optional.empty())
-          .fetch(TabletManagement.CONFIGURED_COLUMNS.toArray(new ColumnType[] 
{})).build()
-          .forEach(tm -> {
-            manager.getTabletStateStore(DataLevel.of(tm.getTableId()))
-                .addTabletStateChange(new TabletManagement(actions, tm));
-          });
-    }
-
-    // this will kick the tablet group watcher into action
-    manager.getEventCoordinator().event("Tablet hosting requested tableId:%s 
extents:%d", tableId,
-        extents.size());
+    manager.getEventCoordinator().event(success, "Tablet hosting requested for 
%d tablets in %s",
+        success.size(), tableId);
   }
 
   protected TableId getTableId(ClientContext context, String tableName)
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index df7fee1e21..627e2ab427 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -38,7 +38,11 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
@@ -65,6 +69,7 @@ import 
org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
 import org.apache.accumulo.core.manager.state.TabletManagement;
 import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
 import org.apache.accumulo.core.manager.state.tables.TableState;
+import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
 import org.apache.accumulo.core.manager.thrift.ManagerState;
 import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
 import org.apache.accumulo.core.metadata.MetadataTable;
@@ -92,6 +97,7 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread;
 import org.apache.accumulo.manager.Manager.TabletGoalState;
 import org.apache.accumulo.manager.split.SplitTask;
@@ -152,12 +158,18 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
   private final TabletGroupWatcher dependentWatcher;
   final TableStats stats = new TableStats();
   private SortedSet<TServerInstance> lastScanServers = 
Collections.emptySortedSet();
+  private final EventHandler eventHandler;
+
+  private WalStateManager walStateManager;
 
   TabletGroupWatcher(Manager manager, TabletStateStore store, 
TabletGroupWatcher dependentWatcher) {
     super("Watching " + store.name());
     this.manager = manager;
     this.store = store;
     this.dependentWatcher = dependentWatcher;
+    this.walStateManager = new WalStateManager(manager.getContext());
+    this.eventHandler = new EventHandler();
+    manager.getEventCoordinator().addListener(store.getLevel(), eventHandler);
   }
 
   /** Should this {@code TabletGroupWatcher} suspend tablets? */
@@ -210,273 +222,397 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     }
   }
 
-  @Override
-  public void run() {
-    int[] oldCounts = new int[TabletState.values().length];
-    EventCoordinator.Listener eventListener = 
this.manager.nextEvent.getListener();
+  class EventHandler implements EventCoordinator.Listener {
 
-    WalStateManager wals = new WalStateManager(manager.getContext());
+    // Setting this to true to start with because its not know what happended 
before this object was
+    // created, so just start off with full scan.
+    private boolean needsFullScan = true;
 
-    while (manager.stillManager()) {
-      // slow things down a little, otherwise we spam the logs when there are 
many wake-up events
-      sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+    private final BlockingQueue<Range> rangesToProcess;
 
-      final long waitTimeBetweenScans = manager.getConfiguration()
-          .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL);
+    class RangeProccessor implements Runnable {
+      @Override
+      public void run() {
+        try {
+          while (manager.stillManager()) {
+            var range = rangesToProcess.poll(100, TimeUnit.MILLISECONDS);
+            if (range == null) {
+              // check to see if still the manager
+              continue;
+            }
 
-      int totalUnloaded = 0;
-      int unloaded = 0;
-      ClosableIterator<TabletManagement> iter = null;
-      try {
-        Map<TableId,MergeStats> mergeStatsCache = new HashMap<>();
-        Map<TableId,MergeStats> currentMerges = new HashMap<>();
-        for (MergeInfo merge : manager.merges()) {
-          if (merge.getExtent() != null) {
-            currentMerges.put(merge.getExtent().tableId(), new 
MergeStats(merge));
+            ArrayList<Range> ranges = new ArrayList<>();
+            ranges.add(range);
+
+            rangesToProcess.drainTo(ranges);
+
+            if (manager.getManagerGoalState() == ManagerGoalState.CLEAN_STOP) {
+              // only do full scans when trying to shutdown
+              setNeedsFullScan();
+              continue;
+            }
+
+            var currentTservers = getCurrentTservers();
+            if (currentTservers.isEmpty()) {
+              setNeedsFullScan();
+              continue;
+            }
+
+            try (var iter = store.iterator(ranges)) {
+              long t1 = System.currentTimeMillis();
+              manageTablets(iter, currentTservers, false);
+              long t2 = System.currentTimeMillis();
+              Manager.log.debug(String.format("[%s]: partial scan time %.2f 
seconds for %,d ranges",
+                  store.name(), (t2 - t1) / 1000., ranges.size()));
+            } catch (Exception e) {
+              Manager.log.error("Error processing {} ranges for store {} ", 
ranges.size(),
+                  store.name(), e);
+            }
           }
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
         }
+      }
+    }
 
-        // Get the current status for the current list of tservers
-        final SortedMap<TServerInstance,TabletServerStatus> currentTServers = 
new TreeMap<>();
-        for (TServerInstance entry : manager.tserverSet.getCurrentServers()) {
-          currentTServers.put(entry, manager.tserverStatus.get(entry));
-        }
+    EventHandler() {
+      rangesToProcess = new ArrayBlockingQueue<>(3000);
 
-        if (currentTServers.isEmpty()) {
-          eventListener.waitForEvents(waitTimeBetweenScans);
-          synchronized (this) {
-            lastScanServers = Collections.emptySortedSet();
+      Threads
+          .createThread("TGW [" + store.name() + "] event range processor", 
new RangeProccessor())
+          .start();
+    }
+
+    private synchronized void setNeedsFullScan() {
+      needsFullScan = true;
+      notifyAll();
+    }
+
+    public synchronized void clearNeedsFullScan() {
+      needsFullScan = false;
+    }
+
+    @Override
+    public void process(EventCoordinator.Event event) {
+
+      switch (event.getScope()) {
+        case ALL:
+        case DATA_LEVEL:
+          setNeedsFullScan();
+          break;
+        case TABLE:
+        case TABLE_RANGE:
+          if (!rangesToProcess.offer(event.getExtent().toMetaRange())) {
+            Manager.log.debug("[{}] unable to process event range {} because 
queue is full",
+                store.name(), event.getExtent());
+            setNeedsFullScan();
           }
-          continue;
+          break;
+        default:
+          throw new IllegalArgumentException("Unhandled scope " + 
event.getScope());
+      }
+    }
+
+    synchronized void waitForFullScan(long millis) {
+      if (!needsFullScan) {
+        try {
+          wait(millis);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
         }
+      }
+    }
+  }
 
-        final Map<String,Set<TServerInstance>> currentTServerGrouping =
-            manager.tserverSet.getCurrentServersGroups();
+  private static class TableMgmtStats {
+    int[] counts = new int[TabletState.values().length];
+    private int totalUnloaded;
+  }
 
-        TabletLists tLists = new TabletLists(manager, currentTServers, 
currentTServerGrouping);
+  private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
+      SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean 
isFullScan)
+      throws BadLocationStateException, TException, DistributedStoreException, 
WalMarkerException,
+      IOException {
 
-        ManagerState managerState = manager.getManagerState();
-        int[] counts = new int[TabletState.values().length];
-        stats.begin();
+    TableMgmtStats tableMgmtStats = new TableMgmtStats();
+    int unloaded = 0;
 
-        CompactionJobGenerator compactionGenerator = new 
CompactionJobGenerator(
-            new ServiceEnvironmentImpl(manager.getContext()), 
manager.getCompactionHints());
+    Map<TableId,MergeStats> mergeStatsCache = new HashMap<>();
+    Map<TableId,MergeStats> currentMerges = new HashMap<>();
+    for (MergeInfo merge : manager.merges()) {
+      if (merge.getExtent() != null) {
+        currentMerges.put(merge.getExtent().tableId(), new MergeStats(merge));
+      }
+    }
 
-        final Map<TabletServerId,String> resourceGroups = new HashMap<>();
-        manager.tServerResourceGroups().forEach((group, tservers) -> {
-          tservers.stream().map(TabletServerIdImpl::new)
-              .forEach(tabletServerId -> resourceGroups.put(tabletServerId, 
group));
-        });
+    final Map<String,Set<TServerInstance>> currentTServerGrouping =
+        manager.tserverSet.getCurrentServersGroups();
 
-        // Walk through the tablets in our store, and work tablets
-        // towards their goal
-        iter = store.iterator();
-        while (iter.hasNext()) {
-          final TabletManagement mti = iter.next();
-          if (mti == null) {
-            throw new IllegalStateException("State store returned a null 
ManagerTabletInfo object");
-          }
+    TabletLists tLists = new TabletLists(manager, currentTServers, 
currentTServerGrouping);
 
-          final Set<ManagementAction> actions = mti.getActions();
-          final TabletMetadata tm = mti.getTabletMetadata();
+    CompactionJobGenerator compactionGenerator = new CompactionJobGenerator(
+        new ServiceEnvironmentImpl(manager.getContext()), 
manager.getCompactionHints());
 
-          if (tm.isFutureAndCurrentLocationSet()) {
-            throw new BadLocationStateException(
-                tm.getExtent() + " is both assigned and hosted, which should 
never happen: " + this,
-                tm.getExtent().toMetaRow());
-          }
-          if (tm.isOperationIdAndCurrentLocationSet()) {
-            throw new BadLocationStateException(tm.getExtent()
+    final Map<TabletServerId,String> resourceGroups = new HashMap<>();
+    manager.tServerResourceGroups().forEach((group, tservers) -> {
+      tservers.stream().map(TabletServerIdImpl::new)
+          .forEach(tabletServerId -> resourceGroups.put(tabletServerId, 
group));
+    });
+
+    while (iter.hasNext()) {
+      final TabletManagement mti = iter.next();
+      if (mti == null) {
+        throw new IllegalStateException("State store returned a null 
ManagerTabletInfo object");
+      }
+
+      final Set<ManagementAction> actions = mti.getActions();
+      final TabletMetadata tm = mti.getTabletMetadata();
+
+      if (tm.isFutureAndCurrentLocationSet()) {
+        throw new BadLocationStateException(
+            tm.getExtent() + " is both assigned and hosted, which should never 
happen: " + this,
+            tm.getExtent().toMetaRow());
+      }
+      if (tm.isOperationIdAndCurrentLocationSet()) {
+        throw new BadLocationStateException(
+            tm.getExtent()
                 + " has both operation id and current location, which should 
never happen: " + this,
-                tm.getExtent().toMetaRow());
-          }
+            tm.getExtent().toMetaRow());
+      }
 
-          final TableId tableId = tm.getTableId();
-          // ignore entries for tables that do not exist in zookeeper
-          if (manager.getTableManager().getTableState(tableId) == null) {
-            continue;
-          }
+      final TableId tableId = tm.getTableId();
+      // ignore entries for tables that do not exist in zookeeper
+      if (manager.getTableManager().getTableState(tableId) == null) {
+        continue;
+      }
 
-          // Don't overwhelm the tablet servers with work
-          if (tLists.unassigned.size() + unloaded
-              > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
-            flushChanges(tLists, wals);
-            tLists.reset();
-            unloaded = 0;
-            eventListener.waitForEvents(waitTimeBetweenScans);
-          }
-          final TableConfiguration tableConf = 
manager.getContext().getTableConfiguration(tableId);
-
-          final MergeStats mergeStats = 
mergeStatsCache.computeIfAbsent(tableId, k -> {
-            var mStats = currentMerges.get(k);
-            return mStats != null ? mStats : new MergeStats(new MergeInfo());
-          });
-          TabletGoalState goal = manager.getGoalState(tm, 
mergeStats.getMergeInfo());
-          TabletState state = TabletState.compute(tm, currentTServers.keySet(),
-              manager.tabletBalancer, resourceGroups);
-
-          final Location location = tm.getLocation();
-          Location current = null;
-          Location future = null;
-          if (tm.hasCurrent()) {
-            current = tm.getLocation();
-          } else {
-            future = tm.getLocation();
-          }
-          TabletLogger.missassigned(tm.getExtent(), goal.toString(), 
state.toString(),
-              future != null ? future.getServerInstance() : null,
-              current != null ? current.getServerInstance() : null, 
tm.getLogs().size());
+      // Don't overwhelm the tablet servers with work
+      if (tLists.unassigned.size() + unloaded
+          > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
+        flushChanges(tLists);
+        tLists.reset();
+        unloaded = 0;
+      }
 
-          stats.update(tableId, state);
-          mergeStats.update(tm.getExtent(), state);
+      final TableConfiguration tableConf = 
manager.getContext().getTableConfiguration(tableId);
+
+      final MergeStats mergeStats = mergeStatsCache.computeIfAbsent(tableId, k 
-> {
+        var mStats = currentMerges.get(k);
+        return mStats != null ? mStats : new MergeStats(new MergeInfo());
+      });
+      TabletGoalState goal = manager.getGoalState(tm, 
mergeStats.getMergeInfo());
+      TabletState state =
+          TabletState.compute(tm, currentTServers.keySet(), 
manager.tabletBalancer, resourceGroups);
+
+      final Location location = tm.getLocation();
+      Location current = null;
+      Location future = null;
+      if (tm.hasCurrent()) {
+        current = tm.getLocation();
+      } else {
+        future = tm.getLocation();
+      }
+      TabletLogger.missassigned(tm.getExtent(), goal.toString(), 
state.toString(),
+          future != null ? future.getServerInstance() : null,
+          current != null ? current.getServerInstance() : null, 
tm.getLogs().size());
 
-          // Always follow through with assignments
-          if (state == TabletState.ASSIGNED) {
+      if (isFullScan) {
+        stats.update(tableId, state);
+      }
+      mergeStats.update(tm.getExtent(), state);
+
+      // Always follow through with assignments
+      if (state == TabletState.ASSIGNED) {
+        goal = TabletGoalState.HOSTED;
+      } else if (state == TabletState.NEEDS_REASSIGNMENT) {
+        goal = TabletGoalState.UNASSIGNED;
+      }
+
+      if (Manager.log.isTraceEnabled()) {
+        Manager.log.trace(
+            "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: 
{}, state: {}, goal: {} actions:{}",
+            store.name(), 
manager.serversToShutdown.equals(currentTServers.keySet()),
+            dependentWatcher == null ? "null" : 
dependentWatcher.assignedOrHosted(), tm.getExtent(),
+            state, goal, actions);
+      }
+
+      // if we are shutting down all the tabletservers, we have to do it in 
order
+      if (isFullScan && (goal == TabletGoalState.SUSPENDED && state == 
TabletState.HOSTED)
+          && manager.serversToShutdown.equals(currentTServers.keySet())) {
+        if (dependentWatcher != null) {
+          // If the dependentWatcher is for the user tables, check to see
+          // that user tables exist.
+          DataLevel dependentLevel = dependentWatcher.store.getLevel();
+          boolean userTablesExist = true;
+          switch (dependentLevel) {
+            case USER:
+              Set<TableId> onlineTables = manager.onlineTables();
+              onlineTables.remove(RootTable.ID);
+              onlineTables.remove(MetadataTable.ID);
+              userTablesExist = !onlineTables.isEmpty();
+              break;
+            case METADATA:
+            case ROOT:
+            default:
+              break;
+          }
+          // If the stats object in the dependentWatcher is empty, then it
+          // currently does not have data about what is hosted or not. In
+          // that case host these tablets until the dependent watcher can
+          // gather some data.
+          final Map<TableId,TableCounts> stats = dependentWatcher.getStats();
+          if (dependentLevel == DataLevel.USER) {
+            if (userTablesExist
+                && (stats == null || stats.isEmpty() || 
assignedOrHosted(stats) > 0)) {
+              goal = TabletGoalState.HOSTED;
+            }
+          } else if (stats == null || stats.isEmpty() || 
assignedOrHosted(stats) > 0) {
             goal = TabletGoalState.HOSTED;
-          } else if (state == TabletState.NEEDS_REASSIGNMENT) {
-            goal = TabletGoalState.UNASSIGNED;
           }
-          if (Manager.log.isTraceEnabled()) {
-            Manager.log.trace(
-                "[{}] Shutting down all Tservers: {}, dependentCount: {} 
Extent: {}, state: {}, goal: {} actions:{}",
-                store.name(), 
manager.serversToShutdown.equals(currentTServers.keySet()),
-                dependentWatcher == null ? "null" : 
dependentWatcher.assignedOrHosted(),
-                tm.getExtent(), state, goal, actions);
+        }
+      }
+
+      if (actions.contains(ManagementAction.NEEDS_SPLITTING)) {
+        LOG.debug("{} may need splitting.", tm.getExtent());
+        if (manager.getSplitter().isSplittable(tm)) {
+          if (manager.getSplitter().addSplitStarting(tm.getExtent())) {
+            LOG.debug("submitting tablet {} for split", tm.getExtent());
+            manager.getSplitter().executeSplit(new 
SplitTask(manager.getContext(), tm, manager));
           }
+        } else {
+          LOG.debug("{} is not splittable.", tm.getExtent());
+        }
+        // ELASITICITY_TODO: See #3605. Merge is non-functional. Left this 
commented out code to
+        // show where merge used to make a call to split a tablet.
+        // sendSplitRequest(mergeStats.getMergeInfo(), state, tm);
+      }
 
-          // if we are shutting down all the tabletservers, we have to do it 
in order
-          if ((goal == TabletGoalState.SUSPENDED && state == 
TabletState.HOSTED)
-              && manager.serversToShutdown.equals(currentTServers.keySet())) {
-            if (dependentWatcher != null) {
-              // If the dependentWatcher is for the user tables, check to see
-              // that user tables exist.
-              DataLevel dependentLevel = dependentWatcher.store.getLevel();
-              boolean userTablesExist = true;
-              switch (dependentLevel) {
-                case USER:
-                  Set<TableId> onlineTables = manager.onlineTables();
-                  onlineTables.remove(RootTable.ID);
-                  onlineTables.remove(MetadataTable.ID);
-                  userTablesExist = !onlineTables.isEmpty();
-                  break;
-                case METADATA:
-                case ROOT:
-                default:
-                  break;
-              }
-              // If the stats object in the dependentWatcher is empty, then it
-              // currently does not have data about what is hosted or not. In
-              // that case host these tablets until the dependent watcher can
-              // gather some data.
-              final Map<TableId,TableCounts> stats = 
dependentWatcher.getStats();
-              if (dependentLevel == DataLevel.USER) {
-                if (userTablesExist
-                    && (stats == null || stats.isEmpty() || 
assignedOrHosted(stats) > 0)) {
-                  goal = TabletGoalState.HOSTED;
-                }
-              } else if (stats == null || stats.isEmpty() || 
assignedOrHosted(stats) > 0) {
-                goal = TabletGoalState.HOSTED;
+      if (actions.contains(ManagementAction.NEEDS_COMPACTING)) {
+        var jobs = compactionGenerator.generateJobs(tm,
+            TabletManagementIterator.determineCompactionKinds(actions));
+        LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), 
jobs.size());
+        manager.getCompactionQueues().add(tm, jobs);
+      }
+
+      // ELASITICITY_TODO the case where a planner generates compactions at 
time T1 for tablet
+      // and later at time T2 generates nothing for the same tablet is not 
being handled. At
+      // time T1 something could have been queued. However at time T2 we will 
not clear those
+      // entries from the queue because we see nothing here for that case. 
After a full
+      // metadata scan could remove any tablets that were not updated during 
the scan.
+
+      if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE)
+          || actions.contains(ManagementAction.IS_MERGING)) {
+        if (goal == TabletGoalState.HOSTED) {
+          if ((state != TabletState.HOSTED && !tm.getLogs().isEmpty())
+              && manager.recoveryManager.recoverLogs(tm.getExtent(), 
tm.getLogs())) {
+            continue;
+          }
+          switch (state) {
+            case HOSTED:
+              if 
(location.getServerInstance().equals(manager.migrations.get(tm.getExtent()))) {
+                manager.migrations.remove(tm.getExtent());
               }
-            }
+              break;
+            case ASSIGNED_TO_DEAD_SERVER:
+              hostDeadTablet(tLists, tm, location);
+              break;
+            case SUSPENDED:
+              hostSuspendedTablet(tLists, tm, location, tableConf);
+              break;
+            case UNASSIGNED:
+              hostUnassignedTablet(tLists, tm.getExtent(),
+                  new UnassignedTablet(location, tm.getLast()));
+              break;
+            case ASSIGNED:
+              // Send another reminder
+              tLists.assigned.add(new Assignment(tm.getExtent(),
+                  future != null ? future.getServerInstance() : null, 
tm.getLast()));
+              break;
+            default:
+              break;
           }
-
-          if (actions.contains(ManagementAction.NEEDS_SPLITTING)) {
-            LOG.debug("{} may need splitting.", tm.getExtent());
-            if (manager.getSplitter().isSplittable(tm)) {
-              if (manager.getSplitter().addSplitStarting(tm.getExtent())) {
-                LOG.debug("submitting tablet {} for split", tm.getExtent());
-                manager.getSplitter()
-                    .executeSplit(new SplitTask(manager.getContext(), tm, 
manager));
+        } else {
+          switch (state) {
+            case SUSPENDED:
+              // Request a move to UNASSIGNED, so as to allow balancing to 
continue.
+              tLists.suspendedToGoneServers.add(tm);
+              cancelOfflineTableMigrations(tm.getExtent());
+              break;
+            case UNASSIGNED:
+              cancelOfflineTableMigrations(tm.getExtent());
+              break;
+            case ASSIGNED_TO_DEAD_SERVER:
+              unassignDeadTablet(tLists, tm);
+              break;
+            case NEEDS_REASSIGNMENT:
+            case HOSTED:
+              TServerConnection client =
+                  
manager.tserverSet.getConnection(location.getServerInstance());
+              if (client != null) {
+                client.unloadTablet(manager.managerLock, tm.getExtent(), 
goal.howUnload(),
+                    manager.getSteadyTime());
+                tableMgmtStats.totalUnloaded++;
+                unloaded++;
+              } else {
+                Manager.log.warn("Could not connect to server {}", location);
               }
-            } else {
-              LOG.debug("{} is not splittable.", tm.getExtent());
-            }
-            // ELASITICITY_TODO: See #3605. Merge is non-functional. Left this 
commented out code to
-            // show where merge used to make a call to split a tablet.
-            // sendSplitRequest(mergeStats.getMergeInfo(), state, tm);
+              break;
+            case ASSIGNED:
+              break;
           }
+        }
+        tableMgmtStats.counts[state.ordinal()]++;
+      }
+    }
 
-          if (actions.contains(ManagementAction.NEEDS_COMPACTING)) {
-            var jobs = compactionGenerator.generateJobs(tm,
-                TabletManagementIterator.determineCompactionKinds(actions));
-            LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), 
jobs.size());
-            manager.getCompactionQueues().add(tm, jobs);
-          }
+    flushChanges(tLists);
+    return tableMgmtStats;
+  }
 
-          // ELASITICITY_TODO the case where a planner generates compactions 
at time T1 for tablet
-          // and later at time T2 generates nothing for the same tablet is not 
being handled. At
-          // time T1 something could have been queued. However at time T2 we 
will not clear those
-          // entries from the queue because we see nothing here for that case. 
After a full
-          // metadata scan could remove any tablets that were not updated 
during the scan.
-
-          if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE)
-              || actions.contains(ManagementAction.IS_MERGING)) {
-            if (goal == TabletGoalState.HOSTED) {
-              if ((state != TabletState.HOSTED && !tm.getLogs().isEmpty())
-                  && manager.recoveryManager.recoverLogs(tm.getExtent(), 
tm.getLogs())) {
-                continue;
-              }
-              switch (state) {
-                case HOSTED:
-                  if 
(location.getServerInstance().equals(manager.migrations.get(tm.getExtent()))) {
-                    manager.migrations.remove(tm.getExtent());
-                  }
-                  break;
-                case ASSIGNED_TO_DEAD_SERVER:
-                  hostDeadTablet(tLists, tm, location, wals);
-                  break;
-                case SUSPENDED:
-                  hostSuspendedTablet(tLists, tm, location, tableConf);
-                  break;
-                case UNASSIGNED:
-                  hostUnassignedTablet(tLists, tm.getExtent(),
-                      new UnassignedTablet(location, tm.getLast()));
-                  break;
-                case ASSIGNED:
-                  // Send another reminder
-                  tLists.assigned.add(new Assignment(tm.getExtent(),
-                      future != null ? future.getServerInstance() : null, 
tm.getLast()));
-                  break;
-                default:
-                  break;
-              }
-            } else {
-              switch (state) {
-                case SUSPENDED:
-                  // Request a move to UNASSIGNED, so as to allow balancing to 
continue.
-                  tLists.suspendedToGoneServers.add(tm);
-                  cancelOfflineTableMigrations(tm.getExtent());
-                  break;
-                case UNASSIGNED:
-                  cancelOfflineTableMigrations(tm.getExtent());
-                  break;
-                case ASSIGNED_TO_DEAD_SERVER:
-                  unassignDeadTablet(tLists, tm, wals);
-                  break;
-                case NEEDS_REASSIGNMENT:
-                case HOSTED:
-                  TServerConnection client =
-                      
manager.tserverSet.getConnection(location.getServerInstance());
-                  if (client != null) {
-                    client.unloadTablet(manager.managerLock, tm.getExtent(), 
goal.howUnload(),
-                        manager.getSteadyTime());
-                    unloaded++;
-                    totalUnloaded++;
-                  } else {
-                    Manager.log.warn("Could not connect to server {}", 
location);
-                  }
-                  break;
-                case ASSIGNED:
-                  break;
-              }
-            }
-            counts[state.ordinal()]++;
+  private SortedMap<TServerInstance,TabletServerStatus> getCurrentTservers() {
+    // Get the current status for the current list of tservers
+    final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new 
TreeMap<>();
+    for (TServerInstance entry : manager.tserverSet.getCurrentServers()) {
+      currentTServers.put(entry, manager.tserverStatus.get(entry));
+    }
+    return currentTServers;
+  }
+
+  @Override
+  public void run() {
+    int[] oldCounts = new int[TabletState.values().length];
+
+    while (manager.stillManager()) {
+      // slow things down a little, otherwise we spam the logs when there are 
many wake-up events
+      sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      // ELASTICITY_TODO above sleep in the case when not doing a full scan to 
make manager more
+      // responsive
+
+      final long waitTimeBetweenScans = manager.getConfiguration()
+          .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL);
+
+      var currentTServers = getCurrentTservers();
+
+      ClosableIterator<TabletManagement> iter = null;
+      try {
+        if (currentTServers.isEmpty()) {
+          eventHandler.waitForFullScan(waitTimeBetweenScans);
+          synchronized (this) {
+            lastScanServers = Collections.emptySortedSet();
           }
+          continue;
         }
 
-        // ELASTICITY_TODO: Add handling for other actions
+        stats.begin();
+
+        ManagerState managerState = manager.getManagerState();
+
+        // Clear the need for a full scan before starting a full scan inorder 
to detect events that
+        // happen during the full scan.
+        eventHandler.clearNeedsFullScan();
 
-        flushChanges(tLists, wals);
+        iter = store.iterator();
+        var tabletMgmtStats = manageTablets(iter, currentTServers, true);
 
         // provide stats after flushing changes to avoid race conditions w/ 
delete table
         stats.end(managerState);
@@ -485,19 +621,21 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
         // Report changes
         for (TabletState state : TabletState.values()) {
           int i = state.ordinal();
-          if (counts[i] > 0 && counts[i] != oldCounts[i]) {
-            manager.nextEvent.event("[%s]: %d tablets are %s", store.name(), 
counts[i],
-                state.name());
+          if (tabletMgmtStats.counts[i] > 0 && tabletMgmtStats.counts[i] != 
oldCounts[i]) {
+            manager.nextEvent.event(store.getLevel(), "[%s]: %d tablets are 
%s", store.name(),
+                tabletMgmtStats.counts[i], state.name());
           }
         }
-        Manager.log.debug(String.format("[%s]: scan time %.2f seconds", 
store.name(),
+        Manager.log.debug(String.format("[%s]: full scan time %.2f seconds", 
store.name(),
             stats.getScanTime() / 1000.));
-        oldCounts = counts;
-        if (totalUnloaded > 0) {
-          manager.nextEvent.event("[%s]: %d tablets unloaded", store.name(), 
totalUnloaded);
+        oldCounts = tabletMgmtStats.counts;
+        if (tabletMgmtStats.totalUnloaded > 0) {
+          manager.nextEvent.event(store.getLevel(), "[%s]: %d tablets 
unloaded", store.name(),
+              tabletMgmtStats.totalUnloaded);
         }
 
-        updateMergeState(mergeStatsCache);
+        // TODO
+        // updateMergeState(mergeStatsCache);
 
         synchronized (this) {
           lastScanServers = 
ImmutableSortedSet.copyOf(currentTServers.keySet());
@@ -505,13 +643,15 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
         if 
(manager.tserverSet.getCurrentServers().equals(currentTServers.keySet())) {
           Manager.log.debug(String.format("[%s] sleeping for %.2f seconds", 
store.name(),
               waitTimeBetweenScans / 1000.));
-          eventListener.waitForEvents(waitTimeBetweenScans);
+          eventHandler.waitForFullScan(waitTimeBetweenScans);
         } else {
-          Manager.log.info("Detected change in current tserver set, re-running 
state machine.");
+          // Create an event at the store level, this will force the next scan 
to be a full scan
+          manager.nextEvent.event(store.getLevel(), "Set of tablet servers 
changed");
         }
       } catch (Exception ex) {
         Manager.log.error("Error processing table state for store " + 
store.name(), ex);
         if (ex.getCause() != null && ex.getCause() instanceof 
BadLocationStateException) {
+          // ELASTICITY_TODO review this function
           repairMetadata(((BadLocationStateException) 
ex.getCause()).getEncodedEndRow());
         } else {
           sleepUninterruptibly(Manager.WAIT_BETWEEN_ERRORS, 
TimeUnit.MILLISECONDS);
@@ -528,12 +668,11 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     }
   }
 
-  private void unassignDeadTablet(TabletLists tLists, TabletMetadata tm, 
WalStateManager wals)
-      throws WalMarkerException {
+  private void unassignDeadTablet(TabletLists tLists, TabletMetadata tm) 
throws WalMarkerException {
     tLists.assignedToDeadServers.add(tm);
     if 
(!tLists.logsForDeadServers.containsKey(tm.getLocation().getServerInstance())) {
       tLists.logsForDeadServers.put(tm.getLocation().getServerInstance(),
-          wals.getWalsInUse(tm.getLocation().getServerInstance()));
+          walStateManager.getWalsInUse(tm.getLocation().getServerInstance()));
     }
   }
 
@@ -582,15 +721,15 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     }
   }
 
-  private void hostDeadTablet(TabletLists tLists, TabletMetadata tm, Location 
location,
-      WalStateManager wals) throws WalMarkerException {
+  private void hostDeadTablet(TabletLists tLists, TabletMetadata tm, Location 
location)
+      throws WalMarkerException {
     tLists.assignedToDeadServers.add(tm);
     if 
(location.getServerInstance().equals(manager.migrations.get(tm.getExtent()))) {
       manager.migrations.remove(tm.getExtent());
     }
     TServerInstance tserver = tm.getLocation().getServerInstance();
     if (!tLists.logsForDeadServers.containsKey(tserver)) {
-      tLists.logsForDeadServers.put(tserver, wals.getWalsInUse(tserver));
+      tLists.logsForDeadServers.put(tserver, 
walStateManager.getWalsInUse(tserver));
     }
   }
 
@@ -1372,7 +1511,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     }
   }
 
-  private void handleDeadTablets(TabletLists tLists, WalStateManager wals)
+  private void handleDeadTablets(TabletLists tLists)
       throws WalMarkerException, DistributedStoreException {
     var deadTablets = tLists.assignedToDeadServers;
     var deadLogs = tLists.logsForDeadServers;
@@ -1387,8 +1526,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       } else {
         store.unassign(deadTablets, deadLogs);
       }
-      markDeadServerLogsAsClosed(wals, deadLogs);
-      manager.nextEvent.event(
+      markDeadServerLogsAsClosed(walStateManager, deadLogs);
+      manager.nextEvent.event(store.getLevel(),
           "Marked %d tablets as suspended because they don't have current 
servers",
           deadTablets.size());
     }
@@ -1433,13 +1572,25 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     }
   }
 
-  private void flushChanges(TabletLists tLists, WalStateManager wals)
+  private final Lock flushLock = new ReentrantLock();
+
+  private void flushChanges(TabletLists tLists)
       throws DistributedStoreException, TException, WalMarkerException {
     var unassigned = Collections.unmodifiableMap(tLists.unassigned);
 
-    handleDeadTablets(tLists, wals);
-
-    getAssignmentsFromBalancer(tLists, unassigned);
+    flushLock.lock();
+    try {
+      // This method was originally only ever called by one thread. The code 
was modified so that
+      // two threads could possibly call this flush method concurrently. It is 
not clear the
+      // following methods are thread safe so a lock is acquired out of 
caution. Balancer plugins
+      // may not expect multiple threads to call them concurrently, Accumulo 
has not done this in
+      // the past. The log recovery code needs to be evaluated for thread 
safety.
+      handleDeadTablets(tLists);
+
+      getAssignmentsFromBalancer(tLists, unassigned);
+    } finally {
+      flushLock.unlock();
+    }
 
     if (!tLists.assignments.isEmpty()) {
       Manager.log.info(String.format("Assigning %d tablets", 
tLists.assignments.size()));
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 7f0be24740..eb2cf605fd 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -106,6 +106,7 @@ import 
org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.compaction.RunningCompaction;
 import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.manager.EventCoordinator;
 import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher;
 import org.apache.accumulo.server.ServerContext;
@@ -161,6 +162,7 @@ public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface
   private final LiveTServerSet tserverSet;
   private final SecurityOperation security;
   private final CompactionJobQueues jobQueues;
+  private final EventCoordinator eventCoordinator;
   // Exposed for tests
   protected volatile Boolean shutdown = false;
 
@@ -171,12 +173,14 @@ public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface
   private final Cache<Path,Integer> checked_tablet_dir_cache;
 
   public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers,
-      SecurityOperation security, CompactionJobQueues jobQueues) {
+      SecurityOperation security, CompactionJobQueues jobQueues,
+      EventCoordinator eventCoordinator) {
     this.ctx = ctx;
     this.tserverSet = tservers;
     this.schedExecutor = this.ctx.getScheduledExecutor();
     this.security = security;
     this.jobQueues = jobQueues;
+    this.eventCoordinator = eventCoordinator;
 
     var refreshLatches = new 
EnumMap<Ample.DataLevel,CountDownLatch>(Ample.DataLevel.class);
     refreshLatches.put(Ample.DataLevel.ROOT, new CountDownLatch(1));
@@ -788,6 +792,9 @@ public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface
     // of a coordinator restart when the Coordinator can't find the TServer 
for the
     // corresponding external compaction.
     recordCompletion(ecid);
+
+    // This will causes the tablet to be reexamined to see if it needs any 
more compactions.
+    eventCoordinator.event(extent, "Compaction completed %s", extent);
   }
 
   private Optional<ReferencedTabletFile> renameOrDeleteFile(TCompactionStats 
stats,
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java
index 38025c916e..562c909842 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java
@@ -62,7 +62,7 @@ public class ChangeTableState extends ManagerRepo {
     Utils.unreserveNamespace(env, namespaceId, tid, false);
     Utils.unreserveTable(env, tableId, tid, true);
     LoggerFactory.getLogger(ChangeTableState.class).debug("Changed table state 
{} {}", tableId, ts);
-    env.getEventCoordinator().event("Set table state of %s to %s", tableId, 
ts);
+    env.getEventCoordinator().event(tableId, "Set table state of %s to %s", 
tableId, ts);
     return null;
   }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java
index da4a6d6975..3733271049 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java
@@ -62,8 +62,8 @@ class FinishCloneTable extends ManagerRepo {
     Utils.unreserveTable(environment, cloneInfo.srcTableId, tid, false);
     Utils.unreserveTable(environment, cloneInfo.tableId, tid, true);
 
-    environment.getEventCoordinator().event("Cloned table %s from %s", 
cloneInfo.tableName,
-        cloneInfo.srcTableId);
+    environment.getEventCoordinator().event(cloneInfo.tableId, "Cloned table 
%s from %s",
+        cloneInfo.tableName, cloneInfo.srcTableId);
 
     LoggerFactory.getLogger(FinishCloneTable.class).debug("Cloned table " + 
cloneInfo.srcTableId
         + " " + cloneInfo.tableId + " " + cloneInfo.tableName);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 0f8491fc5e..7868aee84c 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -39,6 +39,7 @@ import 
org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
@@ -146,6 +147,9 @@ class CompactionDriver extends ManagerRepo {
 
       int selected = 0;
 
+      KeyExtent minSelected = null;
+      KeyExtent maxSelected = null;
+
       CompactionConfig config = 
CompactionConfigStorage.getConfig(manager.getContext(), tid);
 
       for (TabletMetadata tablet : tablets) {
@@ -211,6 +215,14 @@ class CompactionDriver extends ManagerRepo {
                 && tabletMetadata.getSelectedFiles().getMetadataValue()
                     .equals(selectedFiles.getMetadataValue()));
 
+            if (minSelected == null || 
tablet.getExtent().compareTo(minSelected) < 0) {
+              minSelected = tablet.getExtent();
+            }
+
+            if (maxSelected == null || 
tablet.getExtent().compareTo(maxSelected) > 0) {
+              maxSelected = tablet.getExtent();
+            }
+
             selected++;
           }
 
@@ -239,10 +251,9 @@ class CompactionDriver extends ManagerRepo {
               result.getExtent()));
 
       if (selected > 0) {
-        // selected files for some tablets, send a notification to get the 
tablet group watcher to
-        // scan for tablets to compact
-        manager.getEventCoordinator().event("%s selected files for compaction 
for %d tablets",
-            FateTxId.formatTid(tid), selected);
+        manager.getEventCoordinator().event(
+            new KeyExtent(tableId, maxSelected.endRow(), 
minSelected.prevEndRow()),
+            "%s selected files for compaction for %d tablets", 
FateTxId.formatTid(tid), selected);
       }
 
       return total - complete;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java
index cdd6ad7291..58abc8d844 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/FinishCreateTable.java
@@ -62,7 +62,8 @@ class FinishCreateTable extends ManagerRepo {
     Utils.unreserveNamespace(env, tableInfo.getNamespaceId(), tid, false);
     Utils.unreserveTable(env, tableInfo.getTableId(), tid, true);
 
-    env.getEventCoordinator().event("Created table %s ", 
tableInfo.getTableName());
+    env.getEventCoordinator().event(tableInfo.getTableId(), "Created table %s 
",
+        tableInfo.getTableName());
 
     if (tableInfo.getInitialSplitSize() > 0) {
       cleanupSplitFiles(env);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
index c578f64b89..e8eb5c8761 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
@@ -48,7 +48,7 @@ public class DeleteTable extends ManagerRepo {
   @Override
   public Repo<Manager> call(long tid, Manager env) {
     env.getTableManager().transitionTableState(tableId, TableState.DELETING);
-    env.getEventCoordinator().event("deleting table %s ", tableId);
+    env.getEventCoordinator().event(tableId, "deleting table %s ", tableId);
     return new CleanUp(tableId, namespaceId);
   }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
index a01539d339..d5ec7f9a88 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
@@ -67,6 +67,10 @@ public class DeleteOperationIds extends ManagerRepo {
                 
.stream().map(Ample.ConditionalResult::getStatus).collect(Collectors.toSet()));
       }
 
+      // Get the tablets hosted ASAP if necessary.
+      manager.getEventCoordinator().event(splitInfo.getOriginal(), "Added %d 
splits to %s",
+          splitInfo.getSplits().size(), splitInfo.getOriginal());
+
       TabletLogger.split(splitInfo.getOriginal(), splitInfo.getSplits());
     }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java
index 0df0771ecf..8f55aa3313 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/FinishImportTable.java
@@ -63,7 +63,7 @@ class FinishImportTable extends ManagerRepo {
       Utils.unreserveHdfsDirectory(env, new Path(dm.exportDir).toString(), 
tid);
     }
 
-    env.getEventCoordinator().event("Imported table %s ", tableInfo.tableName);
+    env.getEventCoordinator().event(tableInfo.tableId, "Imported table %s ", 
tableInfo.tableName);
 
     LoggerFactory.getLogger(FinishImportTable.class)
         .debug("Imported table " + tableInfo.tableId + " " + 
tableInfo.tableName);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java
new file mode 100644
index 0000000000..84ab0d0482
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class SplitMillionIT extends AccumuloClusterHarness {
+
+  @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", 
"DMI_RANDOM_USED_ONLY_ONCE"},
+      justification = "predictable random is ok for testing")
+  @Test
+  public void testOneMillionTablets() throws Exception {
+    Logger log = LoggerFactory.getLogger(SplitIT.class);
+
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+      c.tableOperations().create(tableName);
+
+      SortedSet<Text> splits = new TreeSet<>();
+
+      for (int i = 100; i < 100_000_000; i += 100) {
+        String split = String.format("%010d", i);
+
+        splits.add(new Text(split));
+
+        if (splits.size() >= 10000) {
+          addSplits(c, tableName, splits, log);
+        }
+      }
+
+      if (!splits.isEmpty()) {
+        addSplits(c, tableName, splits, log);
+      }
+
+      var rows = IntStream
+          .concat(new Random().ints(98, 0, 100_000_000).flatMap(i -> 
IntStream.of(i, i + 1)),
+              IntStream.of(0, 1, 99_999_998, 99_999_999))
+          .toArray();
+
+      // read and write to a few of the 1 million tablets. The following 
should touch the first,
+      // last, and a few middle tablets.
+      for (var rowInt : rows) {
+
+        var row = String.format("%010d", rowInt);
+
+        long t1 = System.currentTimeMillis();
+        try (var scanner = c.createScanner(tableName)) {
+          scanner.setRange(new Range(row));
+          assertEquals(0, scanner.stream().count());
+        }
+
+        long t2 = System.currentTimeMillis();
+
+        try (var writer = c.createBatchWriter(tableName)) {
+          Mutation m = new Mutation(row);
+          m.put("c", "x", "200");
+          m.put("c", "y", "900");
+          m.put("c", "z", "300");
+          writer.addMutation(m);
+        }
+
+        long t3 = System.currentTimeMillis();
+
+        try (var scanner = c.createScanner(tableName)) {
+          scanner.setRange(new Range(row));
+          Map<String,String> coords = 
scanner.stream().collect(Collectors.toMap(
+              e -> e.getKey().getColumnQualifier().toString(), e -> 
e.getValue().toString()));
+          assertEquals(Map.of("x", "200", "y", "900", "z", "300"), coords);
+        }
+
+        long t4 = System.currentTimeMillis();
+        log.info("Row: {} scan1: {}ms write: {}ms scan2: {}ms", row, t2 - t1, 
t3 - t2, t4 - t3);
+      }
+
+      long t1 = System.currentTimeMillis();
+      long count = c.tableOperations().getTabletInformation(tableName, new 
Range()).count();
+      long t2 = System.currentTimeMillis();
+      assertEquals(1_000_000, count);
+      log.info("Time to scan all tablets : {}ms", t2 - t1);
+
+      t1 = System.currentTimeMillis();
+      c.tableOperations().delete(tableName);
+      t2 = System.currentTimeMillis();
+      log.info("Time to delete table : {}ms", t2 - t1);
+
+    }
+  }
+
+  private static void addSplits(AccumuloClient c, String tableName, 
SortedSet<Text> splits,
+      Logger log) throws Exception {
+    long t1 = System.currentTimeMillis();
+    c.tableOperations().addSplits(tableName, splits);
+    long t2 = System.currentTimeMillis();
+    log.info("Added {} splits in {}ms", splits.size(), t2 - t1);
+    splits.clear();
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementScannerIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementScannerIT.java
deleted file mode 100644
index 11b4e58ac9..0000000000
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementScannerIT.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.test.functional;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.time.Duration;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.IntStream;
-
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.admin.Locations;
-import org.apache.accumulo.core.client.admin.NewTableConfiguration;
-import org.apache.accumulo.core.client.admin.TabletHostingGoal;
-import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.manager.state.TabletManagement;
-import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
-import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
-import org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader;
-import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
-import org.apache.accumulo.harness.SharedMiniClusterBase;
-import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.manager.state.TabletManagementScanner;
-import org.apache.accumulo.test.util.Wait;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.collect.Iterators;
-
-public class TabletManagementScannerIT extends SharedMiniClusterBase {
-
-  public static class TMSIT_Config implements MiniClusterConfigurationCallback 
{
-
-    @Override
-    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
-      cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
-      cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "10");
-      cfg.setProperty(Property.GENERAL_THREADPOOL_SIZE, "10");
-      cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "180");
-      cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "10s");
-      cfg.setProperty(DefaultOnDemandTabletUnloader.INACTIVITY_THRESHOLD, 
"15");
-    }
-  }
-
-  @Override
-  protected Duration defaultTimeout() {
-    return Duration.ofMinutes(2);
-  }
-
-  @BeforeAll
-  public static void beforeAll() throws Exception {
-    SharedMiniClusterBase.startMiniClusterWithConfig(new TMSIT_Config());
-  }
-
-  @AfterAll
-  public static void afterAll() throws Exception {
-    SharedMiniClusterBase.stopMiniCluster();
-  }
-
-  @Test
-  public void testKnownTabletModificationsTakePriority() throws Exception {
-
-    final ConcurrentLinkedQueue<TabletManagement> mods = new 
ConcurrentLinkedQueue<>();
-
-    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
-
-      // Confirm that the root and metadata tables are hosted
-      Locations rootLocations =
-          c.tableOperations().locate(RootTable.NAME, 
Collections.singletonList(new Range()));
-      rootLocations.groupByTablet().keySet()
-          .forEach(tid -> assertNotNull(rootLocations.getTabletLocation(tid)));
-
-      Locations metadataLocations =
-          c.tableOperations().locate(MetadataTable.NAME, 
Collections.singletonList(new Range()));
-      metadataLocations.groupByTablet().keySet()
-          .forEach(tid -> 
assertNotNull(metadataLocations.getTabletLocation(tid)));
-
-      String tableName = super.getUniqueNames(1)[0];
-      NewTableConfiguration ntc = new NewTableConfiguration();
-      SortedSet<Text> splits = new TreeSet<>();
-      IntStream.range(97, 122).forEach((i) -> {
-        splits.add(new Text(String.valueOf((char) i)));
-      });
-      ntc.withSplits(splits);
-      c.tableOperations().create(tableName, ntc);
-
-      TableId tableId = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
-
-      // wait for the tablets to exist in the metadata table. The tablets
-      // will not be hosted so the current location will be empty. Passing null
-      // to the TabletManagementScanner should cause all tablets to be returned
-      // by the TabletManagementIterator.
-      try (TabletManagementScanner s = new 
TabletManagementScanner((ClientContext) c,
-          new Range(TabletsSection.getRange(tableId)), null, 
MetadataTable.NAME, mods)) {
-        Wait.waitFor(() -> Iterators.size(s) == 26, 10_000, 250);
-      }
-
-      // set the tablet hosting goal on the first half of the tablets
-      // so they will need a location update action
-      c.tableOperations().setTabletHostingGoal(tableName, new Range(null, "m"),
-          TabletHostingGoal.ALWAYS);
-
-      // let's add a known modification for the last tablet
-      TabletsMetadata tms = 
getCluster().getServerContext().getAmple().readTablets()
-          .forTablet(new KeyExtent(tableId, null, new Text("y")))
-          .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION, ColumnType.SUSPEND, 
ColumnType.LOGS,
-              ColumnType.HOSTING_GOAL, ColumnType.HOSTING_REQUESTED, 
ColumnType.FILES,
-              ColumnType.LAST, ColumnType.OPID)
-          .build();
-      assertEquals(1, Iterators.size(tms.iterator()));
-      TabletMetadata tm = tms.iterator().next();
-      mods.add(new 
TabletManagement(EnumSet.of(ManagementAction.NEEDS_SPLITTING), tm));
-
-      // check the contents of the TabletManagementScanner
-      try (TabletManagementScanner s = new 
TabletManagementScanner((ClientContext) c,
-          new Range(TabletsSection.getRange(tableId)), null, 
MetadataTable.NAME, mods)) {
-
-        @SuppressWarnings("resource")
-        Iterator<TabletManagement> iter = s;
-        // Confirm that the first object is the one we added to the known mods 
set.
-        assertFalse(mods.isEmpty());
-        assertTrue(iter.hasNext());
-        TabletManagement tman = iter.next();
-        assertEquals(1, tman.getActions().size());
-        
assertTrue(tman.getActions().contains(ManagementAction.NEEDS_SPLITTING));
-        assertEquals(tm, tman.getTabletMetadata());
-
-        assertTrue(mods.isEmpty());
-
-        // The TabletManagementScanner is going to return all of the tablets 
because
-        // we passed null for the state. Confirm that the hosting goal is 
ALWAYS for
-        // for the first 13 and ondemand for the latter half. When running in 
the Manager
-        // the latter half of the results would not be returned.
-        for (int i = 0; i < 26; i++) {
-          assertTrue(iter.hasNext());
-          tman = iter.next();
-          assertEquals(1, tman.getActions().size());
-          
assertTrue(tman.getActions().contains(ManagementAction.NEEDS_LOCATION_UPDATE));
-          if (i < 13) {
-            assertEquals(TabletHostingGoal.ALWAYS, 
tman.getTabletMetadata().getHostingGoal());
-          } else {
-            assertEquals(TabletHostingGoal.ONDEMAND, 
tman.getTabletMetadata().getHostingGoal());
-          }
-        }
-
-        assertFalse(iter.hasNext());
-      }
-
-    }
-  }
-
-}


Reply via email to