This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new f1c37c4028 Enable auto merging of tablets (#5353)
f1c37c4028 is described below

commit f1c37c402881154ae8412d9171e59d225663488a
Author: Christopher L. Shannon <cshan...@apache.org>
AuthorDate: Sun Mar 16 15:37:48 2025 -0400

    Enable auto merging of tablets (#5353)
    
    This change adds support for periodically scanning tables to find ranges
    of tables that can be automatically merged. The thread that runs uses
    the recently added TabletMergeabilty column as well as total file
    counts/sizes to compute if a range can be merged. By default, tablets
    can be merged if the total size is less than 25% of the split threshold
    so we don't immediately merge only to split again.
    
    There is a new thread in the Manager class that performs the computation
    and will submit fate jobs for merging on tablet ranges. There is a new
    fate operation during merge that will validate the range is still ok to
    merge if the merge was submitted as a system merge.
    
    This closes #5014
---
 .../core/client/admin/InstanceOperations.java      |  11 +
 .../core/clientImpl/InstanceOperationsImpl.java    |   6 +
 .../org/apache/accumulo/core/conf/Property.java    |   8 +
 .../java/org/apache/accumulo/core/fate/Fate.java   |   3 +-
 .../org/apache/accumulo/core/fate/FateKey.java     |   8 +-
 .../server/init/FileSystemInitializer.java         |  42 +-
 .../accumulo/manager/FateServiceHandler.java       |   3 +-
 .../java/org/apache/accumulo/manager/Manager.java  |   7 +
 .../manager/merge/FindMergeableRangeTask.java      | 293 +++++++++++++
 .../manager/tableOps/merge/CountFiles.java         |  17 +-
 .../accumulo/manager/tableOps/merge/MergeInfo.java |  10 +-
 .../manager/tableOps/merge/ReserveTablets.java     |  10 +-
 .../manager/tableOps/merge/TableRangeOp.java       |   3 +-
 .../tableOps/merge/UnreserveSystemMerge.java       |  80 ++++
 .../{CountFiles.java => VerifyMergeability.java}   |  67 ++-
 .../accumulo/manager/upgrade/Upgrader11to12.java   |   7 +-
 .../accumulo/manager/upgrade/Upgrader12to13.java   |   6 +-
 .../apache/accumulo/test/ample/TestAmpleUtil.java  |  13 +
 .../accumulo/test/ample/metadata/TestAmple.java    |   5 +-
 .../org/apache/accumulo/test/fate/FateStoreIT.java |  36 +-
 .../apache/accumulo/test/fate/ManagerRepoIT.java   | 114 +++++-
 .../accumulo/test/functional/AddSplitIT.java       |  16 +-
 .../test/functional/CreateInitialSplitsIT.java     |   4 +-
 .../apache/accumulo/test/functional/SplitIT.java   |   6 +-
 .../test/functional/TabletMergeabilityIT.java      | 451 +++++++++++++++++++++
 25 files changed, 1135 insertions(+), 91 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
 
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
index 8545cd3898..739fc3ee70 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.core.client.admin;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -357,4 +358,14 @@ public interface InstanceOperations {
    * @since 2.1.0
    */
   InstanceId getInstanceId();
+
+  /**
+   * Return the current manager time. This duration represents the amount of 
time an accumulo
+   * manager process has been running. The duration is persisted and should 
only increase over the
+   * lifetime of an Accumulo instance.
+   *
+   * @return current time
+   * @since 4.0.0
+   */
+  Duration getManagerTime() throws AccumuloException, 
AccumuloSecurityException;
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index 7c22b7d8a8..fb2c5878f8 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -470,6 +470,12 @@ public class InstanceOperationsImpl implements 
InstanceOperations {
     return context.getInstanceID();
   }
 
+  @Override
+  public Duration getManagerTime() throws AccumuloException, 
AccumuloSecurityException {
+    return Duration.ofNanos(ThriftClientTypes.MANAGER.execute(context,
+        client -> client.getManagerTimeNanos(TraceUtil.traceInfo(), 
context.rpcCreds())));
+  }
+
   @Override
   public ServerId getServer(ServerId.Type type, String resourceGroup, String 
host, int port) {
     Objects.requireNonNull(type, "type parameter cannot be null");
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 4f3b8fb984..3be26b2f19 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -401,6 +401,11 @@ public enum Property {
           + " are performed (e.g. Bulk Import). This property specifies the 
maximum number of threads in a"
           + " ThreadPool in the Manager that will be used to request these 
refresh operations.",
       "4.0.0"),
+  MANAGER_TABLET_MERGEABILITY_INTERVAL("manager.tablet.mergeability.interval", 
"24h",
+      PropertyType.TIMEDURATION,
+      "Time to wait between scanning tables to identify ranges of tablets that 
can be "
+          + " auto-merged. Valid ranges will be have merge fate ops 
submitted.",
+      "4.0.0"),
   MANAGER_BULK_TIMEOUT("manager.bulk.timeout", "5m", PropertyType.TIMEDURATION,
       "The time to wait for a tablet server to process a bulk import 
request.", "1.4.3"),
   MANAGER_RENAME_THREADS("manager.rename.threadpool.size", "20", 
PropertyType.COUNT,
@@ -906,6 +911,9 @@ public enum Property {
   TABLE_ONDEMAND_UNLOADER("tserver.ondemand.tablet.unloader",
       "org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader", 
PropertyType.CLASSNAME,
       "The class that will be used to determine which on-demand Tablets to 
unload.", "4.0.0"),
+  TABLE_MAX_MERGEABILITY_THRESHOLD("table.mergeability.threshold", ".25", 
PropertyType.FRACTION,
+      "A range of tablets are eligible for automatic merging until the 
combined size of RFiles reaches this percentage of the split threshold.",
+      "4.0.0"),
 
   // Crypto-related properties
   @Experimental
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index dd521e3a0c..28a92d4204 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -106,6 +106,7 @@ public class Fate<T> {
     NAMESPACE_RENAME(TFateOperation.NAMESPACE_RENAME),
     SHUTDOWN_TSERVER(null),
     SYSTEM_SPLIT(null),
+    SYSTEM_MERGE(null),
     TABLE_BULK_IMPORT2(TFateOperation.TABLE_BULK_IMPORT2),
     TABLE_CANCEL_COMPACT(TFateOperation.TABLE_CANCEL_COMPACT),
     TABLE_CLONE(TFateOperation.TABLE_CLONE),
@@ -124,7 +125,7 @@ public class Fate<T> {
 
     private final TFateOperation top;
     private static final EnumSet<FateOperation> nonThriftOps =
-        EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT);
+        EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT, 
SYSTEM_MERGE);
 
     FateOperation(TFateOperation top) {
       this.top = top;
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
index 8942149a6f..017ac7955c 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
@@ -119,8 +119,12 @@ public class FateKey {
     return new FateKey(FateKeyType.COMPACTION_COMMIT, compactionId);
   }
 
+  public static FateKey forMerge(KeyExtent extent) {
+    return new FateKey(FateKeyType.MERGE, extent);
+  }
+
   public enum FateKeyType {
-    SPLIT, COMPACTION_COMMIT
+    SPLIT, COMPACTION_COMMIT, MERGE
   }
 
   private static byte[] serialize(FateKeyType type, KeyExtent ke) {
@@ -151,6 +155,7 @@ public class FateKey {
       throws IOException {
     switch (type) {
       case SPLIT:
+      case MERGE:
         return Optional.of(KeyExtent.readFrom(buffer));
       case COMPACTION_COMMIT:
         return Optional.empty();
@@ -163,6 +168,7 @@ public class FateKey {
       DataInputBuffer buffer) throws IOException {
     switch (type) {
       case SPLIT:
+      case MERGE:
         return Optional.empty();
       case COMPACTION_COMMIT:
         return Optional.of(ExternalCompactionId.of(buffer.readUTF()));
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java
 
b/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java
index ef6536da09..e1801b6eca 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java
@@ -20,8 +20,10 @@ package org.apache.accumulo.server.init;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
@@ -48,6 +50,7 @@ import 
org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.conf.store.TablePropKey;
 import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
@@ -80,12 +83,15 @@ public class FileSystemInitializer {
     final Text endRow;
     final Text extent;
     final String[] files;
+    final TabletMergeabilityMetadata mergeability;
 
-    InitialTablet(TableId tableId, String dirName, Text prevEndRow, Text 
endRow, String... files) {
+    InitialTablet(TableId tableId, String dirName, Text prevEndRow, Text 
endRow,
+        TabletMergeabilityMetadata mergeability, String... files) {
       this.tableId = tableId;
       this.dirName = dirName;
       this.prevEndRow = prevEndRow;
       this.endRow = endRow;
+      this.mergeability = Objects.requireNonNull(mergeability);
       this.files = files;
       this.extent = new 
Text(MetadataSchema.TabletsSection.encodeRow(this.tableId, this.endRow));
     }
@@ -94,8 +100,8 @@ public class FileSystemInitializer {
       KeyExtent keyExtent = new KeyExtent(tableId, endRow, prevEndRow);
       var builder = TabletMetadata.builder(keyExtent).putDirName(dirName)
           .putTime(new MetadataTime(0, TimeType.LOGICAL))
-          .putTabletAvailability(TabletAvailability.HOSTED)
-          
.putTabletMergeability(TabletMergeabilityMetadata.never()).putPrevEndRow(prevEndRow);
+          
.putTabletAvailability(TabletAvailability.HOSTED).putTabletMergeability(mergeability)
+          .putPrevEndRow(prevEndRow);
       for (String file : files) {
         builder.putFile(new ReferencedTabletFile(new Path(file)).insert(), new 
DataFileValue(0, 0));
       }
@@ -152,8 +158,12 @@ public class FileSystemInitializer {
     createDirectories(fs, rootTabletDirUri, tableMetadataTabletDirUri, 
defaultMetadataTabletDirUri,
         fateTableDefaultTabletDirUri, scanRefTableDefaultTabletDirUri);
 
-    InitialTablet fateTablet = createFateRefTablet(context);
-    InitialTablet scanRefTablet = createScanRefTablet(context);
+    // For a new system mark the fate tablet and scan ref tablet as always 
mergeable.
+    // Because this is a new system we can just use 0 for the time as that is 
what the Manager
+    // will initialize with when starting
+    var always = 
TabletMergeabilityMetadata.always(SteadyTime.from(Duration.ZERO));
+    InitialTablet fateTablet = createFateRefTablet(context, always);
+    InitialTablet scanRefTablet = createScanRefTablet(context, always);
 
     // populate the metadata tablet with info about the fate and scan ref 
tablets
     String ext = 
FileOperations.getNewFileExtension(DefaultConfiguration.getInstance());
@@ -161,11 +171,13 @@ public class FileSystemInitializer {
     createMetadataFile(fs, metadataFileName, fateTablet, scanRefTablet);
 
     // populate the root tablet with info about the metadata table's two 
initial tablets
-    InitialTablet tablesTablet =
-        new InitialTablet(AccumuloTable.METADATA.tableId(), 
TABLE_TABLETS_TABLET_DIR, null,
-            SPLIT_POINT, StoredTabletFile.of(new 
Path(metadataFileName)).getMetadataPath());
+    // For the default tablet we want to make that mergeable, but don't make 
the TabletsSection
+    // tablet mergeable. This will prevent tablets from each either from being 
auto merged
+    InitialTablet tablesTablet = new 
InitialTablet(AccumuloTable.METADATA.tableId(),
+        TABLE_TABLETS_TABLET_DIR, null, SPLIT_POINT, 
TabletMergeabilityMetadata.never(),
+        StoredTabletFile.of(new Path(metadataFileName)).getMetadataPath());
     InitialTablet defaultTablet = new 
InitialTablet(AccumuloTable.METADATA.tableId(),
-        defaultMetadataTabletDirName, SPLIT_POINT, null);
+        defaultMetadataTabletDirName, SPLIT_POINT, null, always);
     createMetadataFile(fs, rootTabletFileUri, tablesTablet, defaultTablet);
   }
 
@@ -231,18 +243,22 @@ public class FileSystemInitializer {
     tabletWriter.close();
   }
 
-  public InitialTablet createScanRefTablet(ServerContext context) throws 
IOException {
+  public InitialTablet createScanRefTablet(ServerContext context,
+      TabletMergeabilityMetadata mergeability) throws IOException {
     setTableProperties(context, AccumuloTable.SCAN_REF.tableId(), 
initConfig.getScanRefTableConf());
 
     return new InitialTablet(AccumuloTable.SCAN_REF.tableId(),
-        
MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME, null, 
null);
+        
MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME, null, 
null,
+        mergeability);
   }
 
-  public InitialTablet createFateRefTablet(ServerContext context) throws 
IOException {
+  public InitialTablet createFateRefTablet(ServerContext context,
+      TabletMergeabilityMetadata mergeability) throws IOException {
     setTableProperties(context, AccumuloTable.FATE.tableId(), 
initConfig.getFateTableConf());
 
     return new InitialTablet(AccumuloTable.FATE.tableId(),
-        
MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME, null, 
null);
+        
MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME, null, 
null,
+        mergeability);
   }
 
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
index 4efb602520..0dac7412cf 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
@@ -258,7 +258,8 @@ class FateServiceHandler implements FateService.Iface {
         manager.fate(type).seedTransaction(op, fateId,
             new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName, 
timeType, options,
                 splitsPath, splitCount, splitsDirsPath, initialTableState,
-                initialTabletAvailability, namespaceId, 
TabletMergeability.never())),
+                // Set the default tablet to be auto-mergeable with other 
tablets if it is split
+                initialTabletAvailability, namespaceId, 
TabletMergeability.always())),
             autoCleanup, goalMessage);
 
         break;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index a3eb271b37..f5fc39fda0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -123,6 +123,7 @@ import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.core.zookeeper.ZcStat;
 import 
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
+import org.apache.accumulo.manager.merge.FindMergeableRangeTask;
 import org.apache.accumulo.manager.metrics.BalancerMetrics;
 import org.apache.accumulo.manager.metrics.ManagerMetrics;
 import org.apache.accumulo.manager.recovery.RecoveryManager;
@@ -1372,6 +1373,12 @@ public class Manager extends AbstractServer
     ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
         .scheduleWithFixedDelay(() -> 
ScanServerMetadataEntries.clean(context), 10, 10, MINUTES));
 
+    var tabletMergeabilityInterval =
+        
getConfiguration().getDuration(Property.MANAGER_TABLET_MERGEABILITY_INTERVAL);
+    
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+        new FindMergeableRangeTask(this), 
tabletMergeabilityInterval.toMillis(),
+        tabletMergeabilityInterval.toMillis(), MILLISECONDS));
+
     // Make sure that we have a secret key (either a new one or an old one 
from ZK) before we start
     // the manager client service.
     Thread authenticationTokenKeyManagerThread = null;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java
new file mode 100644
index 0000000000..a55fded16d
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.merge;
+
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+import static 
org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.MAX_FILE_COUNT;
+import static 
org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.MAX_TOTAL_SIZE;
+import static 
org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.NOT_CONTIGUOUS;
+import static 
org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.TABLET_MERGEABILITY;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.Fate.FateOperation;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.fate.FateKey;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metadata.schema.filters.TabletMetadataFilter;
+import org.apache.accumulo.core.util.time.SteadyTime;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.TraceRepo;
+import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
+import org.apache.accumulo.manager.tableOps.merge.TableRangeOp;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * This task is used to scan tables to find ranges of tablets that can be 
merged together
+ * automatically.
+ */
+public class FindMergeableRangeTask implements Runnable {
+
+  private static final Logger log = 
LoggerFactory.getLogger(FindMergeableRangeTask.class);
+
+  private static final TabletMergeabilityFilter FILTER = new 
TabletMergeabilityFilter();
+
+  private final Manager manager;
+
+  public FindMergeableRangeTask(Manager manager) {
+    this.manager = Objects.requireNonNull(manager);
+    log.debug("Creating FindMergeableRangeTask");
+  }
+
+  @Override
+  public void run() {
+    var context = manager.getContext();
+    Map<TableId,String> tables = context.getTableIdToNameMap();
+
+    log.debug("Starting FindMergeableRangeTask");
+
+    for (Entry<TableId,String> table : tables.entrySet()) {
+      TableId tableId = table.getKey();
+      String tableName = table.getValue();
+
+      // Read the table configuration to compute the max total file size of a 
mergeable range.
+      // The max size is a percentage of the configured split threshold and we 
do not want
+      // to exceed this limit.
+      long threshold =
+          
context.getTableConfiguration(tableId).getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
+      double mergeabilityThreshold = context.getTableConfiguration(tableId)
+          .getFraction(Property.TABLE_MAX_MERGEABILITY_THRESHOLD);
+      if (mergeabilityThreshold <= 0) {
+        log.trace("Skipping FindMergeableRangeTask for table {}, {}} is set to 
{}", tableName,
+            Property.TABLE_MAX_MERGEABILITY_THRESHOLD.getKey(), 
mergeabilityThreshold);
+        continue;
+      }
+
+      long maxFileCount =
+          
context.getTableConfiguration(tableId).getCount(Property.TABLE_MERGE_FILE_MAX);
+      long maxTotalSize = (long) (threshold * mergeabilityThreshold);
+
+      log.debug("Checking {} for tablets that can be merged", tableName);
+      log.trace("maxFileCount: {}, maxTotalSize:{}, splitThreshold:{}, 
mergeabilityThreshold:{}",
+          maxFileCount, maxTotalSize, threshold, mergeabilityThreshold);
+      try {
+        NamespaceId namespaceId = context.getNamespaceId(tableId);
+        var type = FateInstanceType.fromTableId(tableId);
+
+        try (var tablets = context.getAmple().readTablets().forTable(tableId)
+            .fetch(PREV_ROW, FILES, MERGEABILITY).filter(FILTER).build()) {
+
+          final MergeableRange current =
+              new MergeableRange(tableId, manager.getSteadyTime(), 
maxFileCount, maxTotalSize);
+
+          for (var tm : tablets) {
+            log.trace("Checking tablet {}, {}", tm.getExtent(), 
tm.getTabletMergeability());
+            // If there was an error adding the next tablet to the range then
+            // the existing range is complete as we can't add more tablets so
+            // submit a merge fate op and reset to find more merge ranges
+            current.add(tm).ifPresent(error -> {
+              submit(current, type, table, namespaceId);
+              current.resetAndAdd(tm);
+            });
+          }
+
+          // Try and submit any outstanding mergeable tablets
+          submit(current, type, table, namespaceId);
+        }
+
+      } catch (Exception e) {
+        log.error("Failed to generate system merges for {}", tableName, e);
+      }
+    }
+
+  }
+
+  void submit(MergeableRange range, FateInstanceType type, 
Entry<TableId,String> table,
+      NamespaceId namespaceId) {
+    if (range.tabletCount < 2) {
+      return;
+    }
+
+    TableId tableId = table.getKey();
+    String tableName = table.getValue();
+
+    log.debug("Table {} found {} tablets that can be merged for table", 
tableName,
+        range.tabletCount);
+
+    final Text startRow = range.startRow != null ? range.startRow : new 
Text("");
+    final Text endRow = range.endRow != null ? range.endRow : new Text("");
+
+    final String startRowStr = StringUtils.defaultIfBlank(startRow.toString(), 
"-inf");
+    final String endRowStr = StringUtils.defaultIfBlank(endRow.toString(), 
"+inf");
+    log.debug("FindMergeableRangeTask: Creating merge op: {} from startRow: {} 
to endRow: {}",
+        tableId, startRowStr, endRowStr);
+    var fateKey = FateKey.forMerge(new KeyExtent(tableId, range.endRow, 
range.startRow));
+
+    manager.fate(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateKey,
+        new TraceRepo<>(
+            new TableRangeOp(Operation.SYSTEM_MERGE, namespaceId, tableId, 
startRow, endRow)),
+        true);
+  }
+
+  public enum UnmergeableReason {
+    NOT_CONTIGUOUS, MAX_FILE_COUNT, MAX_TOTAL_SIZE, TABLET_MERGEABILITY;
+
+    // Cache the Optional() reason objects as we will re-use these over and 
over
+    private static final Optional<UnmergeableReason> NOT_CONTIGUOUS_OPT =
+        Optional.of(NOT_CONTIGUOUS);
+    private static final Optional<UnmergeableReason> MAX_FILE_COUNT_OPT =
+        Optional.of(MAX_FILE_COUNT);
+    private static final Optional<UnmergeableReason> MAX_TOTAL_SIZE_OPT =
+        Optional.of(MAX_TOTAL_SIZE);
+    private static final Optional<UnmergeableReason> TABLET_MERGEABILITY_OPT =
+        Optional.of(TABLET_MERGEABILITY);
+
+    public Optional<UnmergeableReason> optional() {
+      switch (this) {
+        case NOT_CONTIGUOUS:
+          return NOT_CONTIGUOUS_OPT;
+        case MAX_FILE_COUNT:
+          return MAX_FILE_COUNT_OPT;
+        case MAX_TOTAL_SIZE:
+          return MAX_TOTAL_SIZE_OPT;
+        case TABLET_MERGEABILITY:
+          return TABLET_MERGEABILITY_OPT;
+        default:
+          throw new IllegalArgumentException("Unexpected enum type");
+      }
+    }
+  }
+
+  public static class MergeableRange {
+    final SteadyTime currentTime;
+    final TableId tableId;
+    final long maxFileCount;
+    final long maxTotalSize;
+
+    Text startRow;
+    Text endRow;
+    int tabletCount;
+    long totalFileCount;
+    long totalFileSize;
+
+    public MergeableRange(TableId tableId, SteadyTime currentTime, long 
maxFileCount,
+        long maxTotalSize) {
+      this.tableId = tableId;
+      this.currentTime = currentTime;
+      this.maxFileCount = maxFileCount;
+      this.maxTotalSize = maxTotalSize;
+    }
+
+    public Optional<UnmergeableReason> add(TabletMetadata tm) {
+      var failure = validate(tm);
+      if (failure.isEmpty()) {
+        tabletCount++;
+        log.trace("Adding tablet {} to MergeableRange", tm.getExtent());
+        if (tabletCount == 1) {
+          startRow = tm.getPrevEndRow();
+        }
+        endRow = tm.getEndRow();
+        totalFileCount += tm.getFiles().size();
+        totalFileSize += tm.getFileSize();
+      }
+      return failure;
+    }
+
+    private Optional<UnmergeableReason> validate(TabletMetadata tm) {
+      Preconditions.checkArgument(tableId.equals(tm.getTableId()), "Unexpected 
tableId seen %s",
+          tm.getTableId());
+
+      if (tabletCount > 0) {
+        // This is at least the second tablet seen so there should not be a 
null prevEndRow
+        Preconditions.checkState(tm.getPrevEndRow() != null,
+            "Unexpected null prevEndRow found for %s", tm.getExtent());
+        // If this is not the first tablet, then verify its prevEndRow matches
+        // the last endRow tracked, the server filter will skip tablets marked 
as never
+        if (!tm.getPrevEndRow().equals(endRow)) {
+          return NOT_CONTIGUOUS.optional();
+        }
+      }
+
+      if (!tm.getTabletMergeability().isMergeable(currentTime)) {
+        return TABLET_MERGEABILITY.optional();
+      }
+
+      if (totalFileCount + tm.getFiles().size() > maxFileCount) {
+        return MAX_FILE_COUNT.optional();
+      }
+
+      if (totalFileSize + tm.getFileSize() > maxTotalSize) {
+        return MAX_TOTAL_SIZE.optional();
+      }
+
+      return Optional.empty();
+    }
+
+    void resetAndAdd(TabletMetadata tm) {
+      reset();
+      add(tm);
+    }
+
+    void reset() {
+      startRow = null;
+      endRow = null;
+      tabletCount = 0;
+      totalFileCount = 0;
+      totalFileSize = 0;
+    }
+  }
+
+  // Filter out never merge tablets to cut down on what we need to check
+  // We need steady time to check other tablets which is not available in the 
filter
+  public static class TabletMergeabilityFilter extends TabletMetadataFilter {
+
+    public static final Set<ColumnType> COLUMNS = 
Sets.immutableEnumSet(MERGEABILITY);
+
+    private final static Predicate<TabletMetadata> IS_NOT_NEVER =
+        tabletMetadata -> !tabletMetadata.getTabletMergeability().isNever();
+
+    @Override
+    public Set<TabletMetadata.ColumnType> getColumns() {
+      return COLUMNS;
+    }
+
+    @Override
+    protected Predicate<TabletMetadata> acceptTablet() {
+      return IS_NOT_NEVER;
+    }
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java
index 4083eecc86..f108419a51 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.manager.tableOps.merge;
 
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static 
org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation.SYSTEM_MERGE;
 
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.fate.FateId;
@@ -28,6 +29,8 @@ import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 public class CountFiles extends ManagerRepo {
   private static final Logger log = LoggerFactory.getLogger(CountFiles.class);
   private static final long serialVersionUID = 1L;
@@ -39,6 +42,9 @@ public class CountFiles extends ManagerRepo {
 
   @Override
   public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
+    // SYSTEM_MERGE should be executing VerifyMergeability repo, which already
+    // will count files
+    Preconditions.checkState(data.op != SYSTEM_MERGE, "Unexpected op %s", 
SYSTEM_MERGE);
 
     var range = data.getReserveExtent();
 
@@ -75,10 +81,13 @@ public class CountFiles extends ManagerRepo {
     if (totalFiles >= maxFiles) {
       return new UnreserveAndError(data, totalFiles, maxFiles);
     } else {
-      if (data.op == MergeInfo.Operation.MERGE) {
-        return new MergeTablets(data);
-      } else {
-        return new DeleteRows(data);
+      switch (data.op) {
+        case MERGE:
+          return new MergeTablets(data);
+        case DELETE:
+          return new DeleteRows(data);
+        default:
+          throw new IllegalStateException("Unknown op " + data.op);
       }
     }
   }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java
index 0da5159b65..6b84e16e7f 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.manager.tableOps.merge;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
@@ -37,7 +38,11 @@ public class MergeInfo implements Serializable {
   private static final long serialVersionUID = 1L;
 
   public enum Operation {
-    MERGE, DELETE,
+    MERGE, SYSTEM_MERGE, DELETE;
+
+    public boolean isMergeOp() {
+      return this == MERGE || this == SYSTEM_MERGE;
+    }
   }
 
   final TableId tableId;
@@ -60,7 +65,7 @@ public class MergeInfo implements Serializable {
     this.namespaceId = namespaceId;
     this.startRow = startRow;
     this.endRow = endRow;
-    this.op = op;
+    this.op = Objects.requireNonNull(op);
     this.mergeRangeSet = mergeRange != null;
     if (mergeRange != null) {
       mergeStartRow =
@@ -102,6 +107,7 @@ public class MergeInfo implements Serializable {
   public KeyExtent getReserveExtent() {
     switch (op) {
       case MERGE:
+      case SYSTEM_MERGE:
         return getOriginalExtent();
       case DELETE: {
         if (endRow == null) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java
index ffaa6adcd2..9a8a4c3dc0 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java
@@ -128,6 +128,14 @@ public class ReserveTablets extends ManagerRepo {
 
   @Override
   public Repo<Manager> call(FateId fateId, Manager environment) throws 
Exception {
-    return new CountFiles(data);
+    switch (data.op) {
+      case SYSTEM_MERGE:
+        return new VerifyMergeability(data);
+      case MERGE:
+      case DELETE:
+        return new CountFiles(data);
+      default:
+        throw new IllegalStateException("Unknown op " + data.op);
+    }
   }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
index ddbbce5b7e..ea63940db6 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java
@@ -57,8 +57,7 @@ public class TableRangeOp extends ManagerRepo {
   @Override
   public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
 
-    if (AccumuloTable.ROOT.tableId().equals(data.tableId)
-        && MergeInfo.Operation.MERGE.equals(data.op)) {
+    if (AccumuloTable.ROOT.tableId().equals(data.tableId) && 
data.op.isMergeOp()) {
       log.warn("Attempt to merge tablets for {} does nothing. It is not 
splittable.",
           AccumuloTable.ROOT.tableName());
     }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveSystemMerge.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveSystemMerge.java
new file mode 100644
index 0000000000..a7201a06ab
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveSystemMerge.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.merge;
+
+import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
+import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.manager.Manager;
+import 
org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UnreserveSystemMerge extends ManagerRepo {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger log = 
LoggerFactory.getLogger(UnreserveSystemMerge.class);
+  private final MergeInfo mergeInfo;
+  private final long maxFileCount;
+  private final long maxTotalSize;
+  private final UnmergeableReason reason;
+
+  public UnreserveSystemMerge(MergeInfo mergeInfo, UnmergeableReason reason, 
long maxFileCount,
+      long maxTotalSize) {
+    this.mergeInfo = mergeInfo;
+    this.reason = reason;
+    this.maxFileCount = maxFileCount;
+    this.maxTotalSize = maxTotalSize;
+  }
+
+  @Override
+  public Repo<Manager> call(FateId fateId, Manager environment) throws 
Exception {
+    FinishTableRangeOp.removeOperationIds(log, mergeInfo, fateId, environment);
+    throw new 
AcceptableThriftTableOperationException(mergeInfo.tableId.toString(), null,
+        mergeInfo.op.isMergeOp() ? TableOperation.MERGE : 
TableOperation.DELETE_RANGE,
+        TableOperationExceptionType.OTHER, formatReason());
+  }
+
+  public UnmergeableReason getReason() {
+    return reason;
+  }
+
+  private String formatReason() {
+    switch (reason) {
+      case MAX_FILE_COUNT:
+        return "Aborted merge because it would produce a tablet with more 
files than the configured limit of "
+            + maxFileCount;
+      case MAX_TOTAL_SIZE:
+        return "Aborted merge because it would produce a tablet with a file 
size larger than the configured limit of "
+            + maxTotalSize;
+      // This state should not happen as VerifyMergeability repo checks 
consistency but adding it
+      // just in case
+      case TABLET_MERGEABILITY:
+        return "Aborted merge because one ore more tablets in the merge range 
are unmergeable.";
+      case NOT_CONTIGUOUS:
+        return "Aborted merge because the tablets in a range do not form a 
linked list.";
+      default:
+        throw new IllegalArgumentException("Unknown Reason");
+    }
+
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/VerifyMergeability.java
similarity index 51%
copy from 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java
copy to 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/VerifyMergeability.java
index 4083eecc86..43b6c1459e 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/VerifyMergeability.java
@@ -19,22 +19,28 @@
 package org.apache.accumulo.manager.tableOps.merge;
 
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
 
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.merge.FindMergeableRangeTask.MergeableRange;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CountFiles extends ManagerRepo {
-  private static final Logger log = LoggerFactory.getLogger(CountFiles.class);
+import com.google.common.base.Preconditions;
+
+public class VerifyMergeability extends ManagerRepo {
+  private static final Logger log = 
LoggerFactory.getLogger(VerifyMergeability.class);
   private static final long serialVersionUID = 1L;
   private final MergeInfo data;
 
-  public CountFiles(MergeInfo mergeInfo) {
+  public VerifyMergeability(MergeInfo mergeInfo) {
     this.data = mergeInfo;
+    Preconditions.checkArgument(data.op == Operation.SYSTEM_MERGE, "Must be a 
System Merge");
   }
 
   @Override
@@ -42,44 +48,33 @@ public class CountFiles extends ManagerRepo {
 
     var range = data.getReserveExtent();
 
-    long totalFiles = 0;
-
-    try (var tablets = 
env.getContext().getAmple().readTablets().forTable(data.tableId)
-        .overlapping(range.prevEndRow(), 
range.endRow()).fetch(FILES).checkConsistency().build()) {
-
-      switch (data.op) {
-        case MERGE:
-          for (var tabletMeta : tablets) {
-            totalFiles += tabletMeta.getFiles().size();
-          }
-          break;
-        case DELETE:
-          for (var tabletMeta : tablets) {
-            // Files in tablets that are completely contained within the merge 
range will be
-            // deleted, so do not count these files .
-            if (!data.getOriginalExtent().contains(tabletMeta.getExtent())) {
-              totalFiles += tabletMeta.getFiles().size();
-            }
-          }
-          break;
-        default:
-          throw new IllegalStateException("Unknown op " + data.op);
-      }
-    }
+    var currentTime = env.getSteadyTime();
+    var context = env.getContext();
+    var tableConf = context.getTableConfiguration(data.tableId);
+    var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
+    var maxMergeabilityThreshold = 
tableConf.getFraction(Property.TABLE_MAX_MERGEABILITY_THRESHOLD);
 
-    long maxFiles = 
env.getContext().getTableConfiguration(data.getOriginalExtent().tableId())
+    // max percentage of split threshold
+    long maxTotalSize = (long) (splitThreshold * maxMergeabilityThreshold);
+    long maxFileCount = 
env.getContext().getTableConfiguration(data.getOriginalExtent().tableId())
         .getCount(Property.TABLE_MERGE_FILE_MAX);
 
-    log.debug("{} found {} files in the merge range, maxFiles is {}", fateId, 
totalFiles, maxFiles);
+    log.debug("Validating system merge for {} with range {}", fateId, range);
+
+    final var mr = new MergeableRange(data.tableId, currentTime, maxFileCount, 
maxTotalSize);
+    try (var tablets = 
env.getContext().getAmple().readTablets().forTable(data.tableId)
+        .overlapping(range.prevEndRow(), range.endRow()).fetch(FILES, 
MERGEABILITY)
+        .checkConsistency().build()) {
 
-    if (totalFiles >= maxFiles) {
-      return new UnreserveAndError(data, totalFiles, maxFiles);
-    } else {
-      if (data.op == MergeInfo.Operation.MERGE) {
-        return new MergeTablets(data);
-      } else {
-        return new DeleteRows(data);
+      for (var tabletMetadata : tablets) {
+        var error = mr.add(tabletMetadata);
+        if (error.isPresent()) {
+          return new UnreserveSystemMerge(data, error.orElseThrow(), 
maxFileCount, maxTotalSize);
+        }
       }
     }
+
+    return new MergeTablets(data);
   }
+
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
index 89a0713ff0..c95f534366 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
@@ -59,6 +59,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ch
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
 import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata;
 import org.apache.accumulo.core.schema.Section;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Encoding;
@@ -289,7 +290,11 @@ public class Upgrader11to12 implements Upgrader {
     try {
       FileSystemInitializer initializer = new FileSystemInitializer(
           new InitialConfiguration(context.getHadoopConf(), 
context.getSiteConfiguration()));
-      FileSystemInitializer.InitialTablet scanRefTablet = 
initializer.createScanRefTablet(context);
+      // For upgrading an existing system set to never merge. If the 
mergeability is changed
+      // then we would look to use the thrift client to look up the current 
Manager time to
+      // set as part of the mergeability metadata
+      FileSystemInitializer.InitialTablet scanRefTablet =
+          initializer.createScanRefTablet(context, 
TabletMergeabilityMetadata.never());
       // Add references to the Metadata Table
       try (BatchWriter writer = 
context.createBatchWriter(AccumuloTable.METADATA.tableName())) {
         writer.addMutation(scanRefTablet.createMutation());
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader12to13.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader12to13.java
index 3c7736d2a9..44b516ea3f 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader12to13.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader12to13.java
@@ -49,6 +49,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
 import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.schema.Section;
@@ -134,8 +135,11 @@ public class Upgrader12to13 implements Upgrader {
     try {
       FileSystemInitializer initializer = new FileSystemInitializer(
           new InitialConfiguration(context.getHadoopConf(), 
context.getSiteConfiguration()));
+      // For upgrading an existing system set to never merge. If the 
mergeability is changed
+      // then we would look to use the thrift client to look up the current 
Manager time to
+      // set as part of the mergeability metadata
       FileSystemInitializer.InitialTablet fateTableTableTablet =
-          initializer.createFateRefTablet(context);
+          initializer.createFateRefTablet(context, 
TabletMergeabilityMetadata.never());
       // Add references to the Metadata Table
       try (BatchWriter writer = 
context.createBatchWriter(AccumuloTable.METADATA.tableName())) {
         writer.addMutation(fateTableTableTablet.createMutation());
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java 
b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
index 2f5ec912fb..8b4283f31d 100644
--- a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
+++ b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
@@ -20,6 +20,9 @@ package org.apache.accumulo.test.ample;
 
 import static 
org.apache.accumulo.test.ample.metadata.TestAmple.testAmpleServerContext;
 
+import java.time.Duration;
+
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.test.ample.metadata.TestAmple.TestServerAmpleImpl;
@@ -35,4 +38,14 @@ public class TestAmpleUtil {
     return manager;
   }
 
+  public static Manager mockWithAmple(ServerContext context, 
TestServerAmpleImpl ample,
+      Duration currentTime) {
+    Manager manager = EasyMock.mock(Manager.class);
+    
EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context, 
ample))
+        .atLeastOnce();
+    
EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(currentTime)).anyTimes();
+    EasyMock.replay(manager);
+    return manager;
+  }
+
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java 
b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java
index 4d8f8c44fd..4fb1a27d52 100644
--- a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java
+++ b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java
@@ -187,12 +187,13 @@ public class TestAmple {
                 WholeRowIterator.decodeRow(entry.getKey(), entry.getValue());
             Text row = decodedRow.firstKey().getRow();
             Mutation m = new Mutation(row);
-
             decodedRow.entrySet().stream().filter(e -> 
includeColumn.test(e.getKey(), e.getValue()))
                 .forEach(e -> m.put(e.getKey().getColumnFamily(), 
e.getKey().getColumnQualifier(),
                     e.getKey().getColumnVisibilityParsed(), 
e.getKey().getTimestamp(),
                     e.getValue()));
-            bw.addMutation(m);
+            if (!m.getUpdates().isEmpty()) {
+              bw.addMutation(m);
+            }
           }
         }
       }
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
index 27de329921..644a2909a3 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -52,6 +53,7 @@ import org.apache.accumulo.core.fate.Fate.TxInfo;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateKey;
+import org.apache.accumulo.core.fate.FateKey.FateKeyType;
 import org.apache.accumulo.core.fate.FateStore;
 import org.apache.accumulo.core.fate.FateStore.FateTxStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
@@ -619,7 +621,9 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
 
       assertEquals(1, idsSeen);
       assertEquals(1, store.list(FateKey.FateKeyType.SPLIT).count());
-      assertEquals(0, 
store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count());
+      // All other types should be a count of 0
+      Arrays.stream(FateKeyType.values()).filter(t -> 
!t.equals(FateKey.FateKeyType.SPLIT))
+          .forEach(t -> assertEquals(0, store.list(t).count()));
 
       for (var future : futures) {
         if (future.get().isPresent()) {
@@ -632,8 +636,9 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
         }
       }
 
-      assertEquals(0, store.list(FateKey.FateKeyType.SPLIT).count());
-      assertEquals(0, 
store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count());
+      // All types should be a count of 0
+      assertTrue(
+          Arrays.stream(FateKeyType.values()).allMatch(t -> 
store.list(t).findAny().isEmpty()));
 
     } finally {
       executor.shutdown();
@@ -676,6 +681,7 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
     TableId tid1 = TableId.of("test");
     var extent1 = new KeyExtent(tid1, new Text("m"), null);
     var extent2 = new KeyExtent(tid1, null, new Text("m"));
+    var extent3 = new KeyExtent(tid1, new Text("z"), new Text("m"));
     var fateKey1 = FateKey.forSplit(extent1);
     var fateKey2 = FateKey.forSplit(extent2);
 
@@ -687,8 +693,12 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
     var fateKey3 = FateKey.forCompactionCommit(cid1);
     var fateKey4 = FateKey.forCompactionCommit(cid2);
 
+    // use one overlapping extent and one different
+    var fateKey5 = FateKey.forMerge(extent1);
+    var fateKey6 = FateKey.forMerge(extent3);
+
     Map<FateKey,FateId> fateKeyIds = new HashMap<>();
-    for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4)) {
+    for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4, 
fateKey5, fateKey6)) {
       var fateId =
           seedTransaction(store, TEST_FATE_OP, fateKey, new TestRepo(), 
true).orElseThrow();
       fateKeyIds.put(fateKey, fateId);
@@ -698,10 +708,10 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
     allIds.addAll(fateKeyIds.values());
     allIds.add(id1);
     assertEquals(allIds, 
store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
-    assertEquals(5, allIds.size());
+    assertEquals(7, allIds.size());
 
-    assertEquals(4, fateKeyIds.size());
-    assertEquals(4, fateKeyIds.values().stream().distinct().count());
+    assertEquals(6, fateKeyIds.size());
+    assertEquals(6, fateKeyIds.values().stream().distinct().count());
 
     HashSet<KeyExtent> seenExtents = new HashSet<>();
     store.list(FateKey.FateKeyType.SPLIT).forEach(fateKey -> {
@@ -709,9 +719,18 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
       assertNotNull(fateKeyIds.remove(fateKey));
       assertTrue(seenExtents.add(fateKey.getKeyExtent().orElseThrow()));
     });
+    assertEquals(4, fateKeyIds.size());
+    assertEquals(Set.of(extent1, extent2), seenExtents);
 
+    // clear set as one overlaps
+    seenExtents.clear();
+    store.list(FateKeyType.MERGE).forEach(fateKey -> {
+      assertEquals(FateKey.FateKeyType.MERGE, fateKey.getType());
+      assertNotNull(fateKeyIds.remove(fateKey));
+      assertTrue(seenExtents.add(fateKey.getKeyExtent().orElseThrow()));
+    });
     assertEquals(2, fateKeyIds.size());
-    assertEquals(Set.of(extent1, extent2), seenExtents);
+    assertEquals(Set.of(extent1, extent3), seenExtents);
 
     HashSet<ExternalCompactionId> seenCids = new HashSet<>();
     store.list(FateKey.FateKeyType.COMPACTION_COMMIT).forEach(fateKey -> {
@@ -722,6 +741,7 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
 
     assertEquals(0, fateKeyIds.size());
     assertEquals(Set.of(cid1, cid2), seenCids);
+
     // Cleanup so we don't interfere with other tests
     store.list()
         .forEach(fateIdStatus -> 
store.tryReserve(fateIdStatus.getFateId()).orElseThrow().delete());
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
index 617eba0ba1..59548cc738 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
@@ -25,18 +25,22 @@ import static 
org.apache.accumulo.test.ample.metadata.ConditionalWriterIntercept
 import static org.apache.accumulo.test.ample.metadata.TestAmple.not;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.assertNoCompactionMetadata;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -45,6 +49,7 @@ import java.util.stream.Stream;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletMergeability;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil;
 import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
@@ -59,6 +64,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily;
 import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -70,6 +76,7 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.manager.Manager;
+import 
org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.manager.tableOps.compact.CompactionDriver;
 import org.apache.accumulo.manager.tableOps.merge.DeleteRows;
@@ -77,10 +84,13 @@ import org.apache.accumulo.manager.tableOps.merge.MergeInfo;
 import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
 import org.apache.accumulo.manager.tableOps.merge.MergeTablets;
 import org.apache.accumulo.manager.tableOps.merge.ReserveTablets;
+import org.apache.accumulo.manager.tableOps.merge.UnreserveSystemMerge;
+import org.apache.accumulo.manager.tableOps.merge.VerifyMergeability;
 import org.apache.accumulo.manager.tableOps.split.AllocateDirsAndEnsureOnline;
 import org.apache.accumulo.manager.tableOps.split.FindSplits;
 import org.apache.accumulo.manager.tableOps.split.PreSplit;
 import org.apache.accumulo.manager.tableOps.split.SplitInfo;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.test.LargeSplitRowIT;
 import org.apache.accumulo.test.ample.metadata.TestAmple;
 import org.apache.accumulo.test.ample.metadata.TestAmple.TestServerAmpleImpl;
@@ -107,7 +117,7 @@ public class ManagerRepoIT extends SharedMiniClusterBase {
   }
 
   @ParameterizedTest
-  @EnumSource(MergeInfo.Operation.class)
+  @EnumSource(value = MergeInfo.Operation.class, names = {"MERGE", "DELETE"})
   public void testNoWalsMergeRepos(MergeInfo.Operation operation) throws 
Exception {
     String[] tableNames = getUniqueNames(2);
     String metadataTable = tableNames[0] + operation;
@@ -163,6 +173,108 @@ public class ManagerRepoIT extends SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testVerifyMergeability() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String metadataTable = tableNames[0];
+    String userTable = tableNames[1];
+
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+
+      SortedMap<Text,TabletMergeability> splits = new TreeMap<>();
+      splits.put(new Text("a"), TabletMergeability.always());
+      splits.put(new Text("b"), TabletMergeability.always());
+      splits.put(new Text("c"), TabletMergeability.never());
+      splits.put(new Text("d"), TabletMergeability.after(Duration.ofDays(2)));
+      splits.put(new Text("e"), TabletMergeability.always());
+
+      client.tableOperations().create(userTable,
+          new 
NewTableConfiguration().setProperties(Map.of(Property.TABLE_SPLIT_THRESHOLD.getKey(),
+              "10K", Property.TABLE_MAJC_RATIO.getKey(), "9999",
+              Property.TABLE_MERGE_FILE_MAX.getKey(), 
"10")).withSplits(splits));
+      TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(userTable));
+
+      // Set up Test ample and manager
+      TestAmple.createMetadataTable(client, metadataTable);
+      TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
+          .create(getCluster().getServerContext(), Map.of(DataLevel.USER, 
metadataTable));
+      testAmple.createMetadataFromExisting(client, tableId);
+      Manager manager =
+          mockWithAmple(getCluster().getServerContext(), testAmple, 
Duration.ofDays(1));
+
+      // Create a test fate id
+      var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+
+      // Tablet c is set to never merge
+      MergeInfo mergeInfo = new MergeInfo(tableId, 
manager.getContext().getNamespaceId(tableId),
+          null, new Text("c").getBytes(), Operation.SYSTEM_MERGE);
+      var repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+      assertInstanceOf(UnreserveSystemMerge.class, repo);
+      assertEquals(UnmergeableReason.TABLET_MERGEABILITY,
+          ((UnreserveSystemMerge) repo).getReason());
+
+      // Tablets a and b are always merge
+      mergeInfo = new MergeInfo(tableId, 
manager.getContext().getNamespaceId(tableId), null,
+          new Text("b").getBytes(), Operation.SYSTEM_MERGE);
+      assertInstanceOf(MergeTablets.class, new 
VerifyMergeability(mergeInfo).call(fateId, manager));
+
+      var context = manager.getContext();
+
+      // split threshold is 10k so default max merge size is 2500 bytes.
+      // this adds 6 files of 450 each which puts the tablets over teh 2500 
threshold
+      addFileMetadata(context, tableId, null, new Text("c"), 3, 450);
+
+      // Data written to the first two tablets totals 2700 bytes and is too 
large
+      repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+      assertInstanceOf(UnreserveSystemMerge.class, repo);
+      assertEquals(UnmergeableReason.MAX_TOTAL_SIZE, ((UnreserveSystemMerge) 
repo).getReason());
+
+      // Not enough time has passed for Tablet, should be able to merge d and e
+      mergeInfo = new MergeInfo(tableId, 
manager.getContext().getNamespaceId(tableId),
+          new Text("c").getBytes(), new Text("e").getBytes(), 
Operation.SYSTEM_MERGE);
+      repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+      assertInstanceOf(UnreserveSystemMerge.class, repo);
+      assertEquals(UnmergeableReason.TABLET_MERGEABILITY,
+          ((UnreserveSystemMerge) repo).getReason());
+
+      // update time to 3 days so enough time has passed
+      manager = mockWithAmple(getCluster().getServerContext(), testAmple, 
Duration.ofDays(3));
+      assertInstanceOf(MergeTablets.class, new 
VerifyMergeability(mergeInfo).call(fateId, manager));
+
+      // last 3 tablets should total 9 files which is < max of 10
+      mergeInfo = new MergeInfo(tableId, 
manager.getContext().getNamespaceId(tableId),
+          new Text("c").getBytes(), null, Operation.SYSTEM_MERGE);
+      addFileMetadata(context, tableId, new Text("c"), null, 3, 10);
+      assertInstanceOf(MergeTablets.class, new 
VerifyMergeability(mergeInfo).call(fateId, manager));
+
+      // last 3 tablets should total 12 files which is > max of 10
+      addFileMetadata(context, tableId, new Text("c"), null, 4, 10);
+      repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+      assertInstanceOf(UnreserveSystemMerge.class, repo);
+      assertEquals(UnmergeableReason.MAX_FILE_COUNT, ((UnreserveSystemMerge) 
repo).getReason());
+    }
+  }
+
+  private void addFileMetadata(ServerContext context, TableId tableId, Text 
start, Text end,
+      int numFiles, int fileSize) {
+    try (
+        var tablets =
+            
context.getAmple().readTablets().forTable(tableId).overlapping(start, 
end).build();
+        var tabletsMutator = context.getAmple().mutateTablets()) {
+      for (var tabletMeta : tablets) {
+        var tabletMutator = 
tabletsMutator.mutateTablet(tabletMeta.getExtent());
+        for (int i = 0; i < numFiles; i++) {
+          StoredTabletFile f = StoredTabletFile.of(new 
org.apache.hadoop.fs.Path(
+              "file:///accumulo/tables/1/" + tabletMeta.getDirName() + "/F" + 
i + ".rf"));
+          DataFileValue dfv = new DataFileValue(fileSize, 100);
+          tabletMutator.putFile(f, dfv);
+        }
+        tabletMutator.mutate();
+      }
+    }
+  }
+
   @Test
   public void testSplitOffline() throws Exception {
     String[] tableNames = getUniqueNames(2);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
index e97c239e95..5a72053d93 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
@@ -53,7 +53,6 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -127,12 +126,7 @@ public class AddSplitIT extends SharedMiniClusterBase {
       verifyData(c, tableName, 2L);
 
       TableId id = TableId.of(c.tableOperations().tableIdMap().get(tableName));
-      try (TabletsMetadata tm =
-          
getCluster().getServerContext().getAmple().readTablets().forTable(id).build()) {
-        // Default for user created tablets should be mergeability set to NEVER
-        tm.stream().forEach(tablet -> 
assertEquals(TabletMergeabilityMetadata.never(),
-            tablet.getTabletMergeability()));
-      }
+      verifySplits(id, TabletMergeabilityUtil.userDefaultSplits(splits));
     }
   }
 
@@ -360,9 +354,9 @@ public class AddSplitIT extends SharedMiniClusterBase {
         
getCluster().getServerContext().getAmple().readTablets().forTable(id).build()) {
       tm.stream().forEach(t -> {
         var split = t.getEndRow();
-        // default tablet should be set to never
+        // default tablet should be set to always
         if (split == null) {
-          assertEquals(TabletMergeability.never(),
+          assertEquals(TabletMergeability.always(),
               t.getTabletMergeability().getTabletMergeability());
         } else {
           assertTrue(addedSplits.remove(split));
@@ -381,9 +375,9 @@ public class AddSplitIT extends SharedMiniClusterBase {
     c.tableOperations().getTabletInformation(tableName, new 
Range()).forEach(ti -> {
       var tmInfo = ti.getTabletMergeabilityInfo();
       var split = ti.getTabletId().getEndRow();
-      // default tablet should always be set to never
+      // default tablet should always be set to always
       if (split == null) {
-        assertEquals(TabletMergeability.never(),
+        assertEquals(TabletMergeability.always(),
             ti.getTabletMergeabilityInfo().getTabletMergeability());
       } else {
         assertTrue(addedSplits.remove(split));
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java
index bc3176258a..40cf9b0f51 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java
@@ -139,10 +139,10 @@ public class CreateInitialSplitsIT extends 
SharedMiniClusterBase {
     var tableId = getCluster().getServerContext().getTableId(tableName);
     try (var tablets =
         
getCluster().getServerContext().getAmple().readTablets().forTable(tableId).build())
 {
-      // default tablet (null end row) should have a default 
TabletMergeability of never for user
+      // default tablet (null end row) should have a default 
TabletMergeability of always for user
       // created tablets
       assertTrue(tablets.stream()
-          .anyMatch(tm -> tm.getEndRow() == null && 
tm.getTabletMergeability().isNever()));
+          .anyMatch(tm -> tm.getEndRow() == null && 
tm.getTabletMergeability().isAlways()));
       // other splits should be created with a duration of 10 seconds
       assertEquals(10, tablets.stream().filter(tm -> 
tm.getTabletMergeability().getDelay()
           .map(delay -> delay.equals(splitDuration)).orElse(false)).count());
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index a985d3969a..661afa74b2 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@ -262,11 +262,9 @@ public class SplitIT extends AccumuloClusterHarness {
           }
           if (TabletColumnFamily.MERGEABILITY_COLUMN.getColumnQualifier()
               .equals(entry.getKey().getColumnQualifier())) {
-            // Default tablet should be set to NEVER, all newly generated 
system splits should be
+            // Default tablet should be set to ALWAYS, all newly generated 
system splits should be
             // set to ALWAYS
-            var mergeability =
-                extent.endRow() == null ? TabletMergeability.never() : 
TabletMergeability.always();
-            assertEquals(mergeability,
+            assertEquals(TabletMergeability.always(),
                 
TabletMergeabilityMetadata.fromValue(entry.getValue()).getTabletMergeability());
           }
           count++;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletMergeabilityIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletMergeabilityIT.java
new file mode 100644
index 0000000000..eb93dad34a
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletMergeabilityIT.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.test.TestIngest.generateRow;
+import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.countTablets;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.TabletMergeability;
+import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.AccumuloTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.accumulo.test.VerifyIngest.VerifyParams;
+import org.apache.accumulo.test.util.FileMetadataUtil;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TabletMergeabilityIT extends SharedMiniClusterBase {
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofMinutes(5);
+  }
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    SharedMiniClusterBase.startMiniClusterWithConfig(new Callback());
+  }
+
+  @AfterAll
+  public static void teardown() {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  private static class Callback implements MiniClusterConfigurationCallback {
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
+      // Configure a short period of time to run the auto merge thread for 
testing
+      cfg.setProperty(Property.MANAGER_TABLET_MERGEABILITY_INTERVAL, "3s");
+    }
+  }
+
+  @Test
+  public void testMergeabilityAlwaysUserTable() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      c.tableOperations().create(tableName);
+      var tableId = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+      testMergeabilityAlways(c, tableName, "", Set.of(new KeyExtent(tableId, 
null, null)));
+    }
+  }
+
+  @Test
+  public void testMergeabilityMetadata() throws Exception {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      var splitPoint = 
MetadataSchema.TabletsSection.getRange().getEndKey().getRow();
+      // Test merge with new splits added after splitPoint default tablet 
(which is mergeable)
+      // Should keep tablets section tablet on merge
+      testMergeabilityAlways(c, AccumuloTable.METADATA.tableName(), "~",
+          Set.of(new KeyExtent(AccumuloTable.METADATA.tableId(), null, 
splitPoint),
+              new KeyExtent(AccumuloTable.METADATA.tableId(), splitPoint, 
null)));
+    }
+  }
+
+  @Test
+  public void testMergeabilityFate() throws Exception {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      testMergeabilityAlways(c, AccumuloTable.FATE.tableName(), "",
+          Set.of(new KeyExtent(AccumuloTable.FATE.tableId(), null, null)));
+    }
+  }
+
+  @Test
+  public void testMergeabilityScanRef() throws Exception {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      testMergeabilityAlways(c, AccumuloTable.SCAN_REF.tableName(), "",
+          Set.of(new KeyExtent(AccumuloTable.SCAN_REF.tableId(), null, null)));
+    }
+  }
+
+  private void testMergeabilityAlways(AccumuloClient c, String tableName, 
String newSplitPrefix,
+      Set<KeyExtent> expectedMergedExtents) throws Exception {
+    var tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+    TreeSet<Text> splits = new TreeSet<>();
+    splits.add(new Text(newSplitPrefix + String.format("%09d", 333)));
+    splits.add(new Text(newSplitPrefix + String.format("%09d", 666)));
+    splits.add(new Text(newSplitPrefix + String.format("%09d", 999)));
+
+    // create splits with mergeabilty disabled so the task does not merge them 
away
+    // The default tablet is always mergeable, but it is currently the only 
one that is mergeable,
+    // so nothing will merge
+    c.tableOperations().putSplits(tableName, 
TabletMergeabilityUtil.userDefaultSplits(splits));
+    Wait.waitFor(() -> countTablets(getCluster().getServerContext(), 
tableName, tm -> true)
+        == splits.size() + expectedMergedExtents.size(), 5000, 200);
+
+    // update new splits to always mergeable so the task can now merge tablets
+    c.tableOperations().putSplits(tableName, 
TabletMergeabilityUtil.systemDefaultSplits(splits));
+
+    // Wait for merge, and check extents
+    Wait.waitFor(
+        () -> hasExactTablets(getCluster().getServerContext(), tableId, 
expectedMergedExtents),
+        10000, 2000);
+  }
+
+  @Test
+  public void testMergeabilityMultipleRanges() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      c.tableOperations().create(tableName);
+      var tableId = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+      SortedMap<Text,TabletMergeability> splits = new TreeMap<>();
+      splits.put(new Text(String.format("%09d", 333)), 
TabletMergeability.never());
+      splits.put(new Text(String.format("%09d", 555)), 
TabletMergeability.never());
+      splits.put(new Text(String.format("%09d", 666)), 
TabletMergeability.never());
+      splits.put(new Text(String.format("%09d", 999)), 
TabletMergeability.never());
+
+      c.tableOperations().putSplits(tableName, splits);
+      Wait.waitFor(() -> countTablets(getCluster().getServerContext(), 
tableName, tm -> true) == 5,
+          5000, 500);
+
+      splits.put(new Text(String.format("%09d", 333)), 
TabletMergeability.always());
+      splits.put(new Text(String.format("%09d", 555)), 
TabletMergeability.always());
+      // Keep tablet 666 as never, this should cause two fate jobs for merging
+      splits.put(new Text(String.format("%09d", 999)), 
TabletMergeability.always());
+      c.tableOperations().putSplits(tableName, splits);
+
+      // Wait for merge, we should have 3 tablets
+      // 333 and 555 should be merged into 555
+      // 666
+      // 999 and default merged into default
+      Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), 
tableId,
+          Set.of(new KeyExtent(tableId, new Text(String.format("%09d", 555)), 
null),
+              new KeyExtent(tableId, new Text(String.format("%09d", 666)),
+                  new Text(String.format("%09d", 555))),
+              new KeyExtent(tableId, null, new Text(String.format("%09d", 
666))))),
+          10000, 200);
+
+    }
+  }
+
+  @Test
+  public void testMergeabilityThresholdMultipleRanges() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      Map<String,String> props = new HashMap<>();
+      props.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "32K");
+      props.put(Property.TABLE_MAX_MERGEABILITY_THRESHOLD.getKey(), ".5");
+      c.tableOperations().create(tableName, new NewTableConfiguration()
+          
.withInitialTabletAvailability(TabletAvailability.HOSTED).setProperties(props));
+      var tableId = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+      SortedMap<Text,TabletMergeability> splits = new TreeMap<>();
+      // Create new tablets that won't merge automatically
+      for (int i = 10000; i <= 90000; i += 10000) {
+        splits.put(row(i), TabletMergeability.never());
+      }
+
+      c.tableOperations().putSplits(tableName, splits);
+      // Verify we now have 10 tablets
+      // [row_0000010000, row_0000020000, row_0000030000, row_0000040000, 
row_0000050000,
+      // row_0000060000, row_0000070000, row_0000080000, row_0000090000, 
default]
+      Wait.waitFor(() -> countTablets(getCluster().getServerContext(), 
tableName, tm -> true) == 10,
+          5000, 500);
+
+      // Insert rows into each tablet with different numbers of rows
+      // Tablets with end rows row_0000020000 - row_0000040000, row_0000060000 
- row_0000080000,
+      // default will have 1000 rows
+      // Tablets with end rows row_0000010000, row_0000050000, row_0000090000 
will have 5000 rows
+      try (BatchWriter bw = c.createBatchWriter(tableName)) {
+        final var value = StringUtils.repeat("a", 1024);
+        for (int i = 0; i < 100000; i += 10000) {
+          var rows = 1000;
+          if (i % 40000 == 0) {
+            rows = 5000;
+          }
+          for (int j = 0; j < rows; j++) {
+            Mutation m = new Mutation(row(i + j));
+            m.put(new Text("cf1"), new Text("cq1"), new Value(value));
+            bw.addMutation(m);
+          }
+        }
+      }
+      c.tableOperations().flush(tableName, null, null, true);
+
+      // Set all 10 tablets to be auto-mergeable
+      for (int i = 10000; i <= 90000; i += 10000) {
+        splits.put(row(i), TabletMergeability.always());
+      }
+      c.tableOperations().putSplits(tableName, splits);
+
+      // With the mergeability threshold set to 50% of 32KB we should be able 
to merge together
+      // the tablets with 1000 rows, but not 5000 rows. This should produce 
the following
+      // 6 tablets after merger.
+      Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), 
tableId,
+          Set.of(new KeyExtent(tableId, row(10000), null),
+              new KeyExtent(tableId, row(40000), row(10000)),
+              new KeyExtent(tableId, row(50000), row(40000)),
+              new KeyExtent(tableId, row(80000), row(50000)),
+              new KeyExtent(tableId, row(90000), row(80000)),
+              new KeyExtent(tableId, null, row(90000)))),
+          10000, 200);
+    }
+  }
+
+  @Test
+  public void testSplitAndMergeAll() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      Map<String,String> props = new HashMap<>();
+      props.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "16K");
+      props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K");
+      c.tableOperations().create(tableName, new 
NewTableConfiguration().setProperties(props)
+          .withInitialTabletAvailability(TabletAvailability.HOSTED));
+      var tableId = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+      // Ingest data so tablet will split
+      VerifyParams params = new VerifyParams(getClientProps(), tableName, 
5_000);
+      TestIngest.ingest(c, params);
+      c.tableOperations().flush(tableName);
+      VerifyIngest.verifyIngest(c, params);
+
+      // Wait for table to split, should be more than 10 tablets
+      Wait.waitFor(() -> c.tableOperations().listSplits(tableName).size() > 
10, 30000, 200);
+
+      // Delete all the data - We can't use deleteRows() as that would merge 
empty tablets
+      // Instead, we want the mergeability thread to merge so use a batch 
deleter and
+      // compact away the deleted data
+      var bd = c.createBatchDeleter(tableName, Authorizations.EMPTY, 1);
+      bd.setRanges(List.of(new Range()));
+      bd.delete();
+      c.tableOperations().compact(tableName, new 
CompactionConfig().setFlush(true));
+
+      // Wait for merge back to default tablet
+      Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), 
tableId,
+          Set.of(new KeyExtent(tableId, null, null))), 30000, 200);
+    }
+  }
+
+  @Test
+  public void testMergeabilityThreshold() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      Map<String,String> props = new HashMap<>();
+      props.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "16K");
+      props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K");
+      // Set a low threshold to 1% of the split threshold
+      props.put(Property.TABLE_MAX_MERGEABILITY_THRESHOLD.getKey(), ".01");
+      c.tableOperations().create(tableName, new 
NewTableConfiguration().setProperties(props)
+          .withInitialTabletAvailability(TabletAvailability.HOSTED));
+      var tableId = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+      // Ingest data so tablet will split
+      VerifyParams params = new VerifyParams(getClientProps(), tableName, 
5_000);
+      TestIngest.ingest(c, params);
+      c.tableOperations().flush(tableName);
+      VerifyIngest.verifyIngest(c, params);
+
+      // Wait for table to split, should be more than 10 tablets
+      Wait.waitFor(() -> c.tableOperations().listSplits(tableName).size() > 
10, 10000, 200);
+
+      // Set the split threshold back to the default of 5 MB. There's not a 
lot of data so normally
+      // we could merge back to 1 tablet, but the threshold is too low at 1% 
so it should not merge
+      // yet.
+      c.tableOperations().setProperty(tableName, 
Property.TABLE_SPLIT_THRESHOLD.getKey(), "5m");
+
+      // Should not merge so make sure it throws IllegalStateException
+      assertThrows(IllegalStateException.class,
+          () -> Wait.waitFor(() -> 
hasExactTablets(getCluster().getServerContext(), tableId,
+              Set.of(new KeyExtent(tableId, null, null))), 5000, 500));
+      // Make sure we failed because of exact tablets and not a different 
IllegalStateException
+      assertFalse(hasExactTablets(getCluster().getServerContext(), tableId,
+          Set.of(new KeyExtent(tableId, null, null))));
+
+      // With a 10% threshold we should be able to merge
+      c.tableOperations().setProperty(tableName, 
Property.TABLE_MAX_MERGEABILITY_THRESHOLD.getKey(),
+          ".1");
+
+      // Wait for merge back to default tablet
+      Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), 
tableId,
+          Set.of(new KeyExtent(tableId, null, null))), 10000, 200);
+
+      // re-verify the data after merge
+      VerifyIngest.verifyIngest(c, params);
+    }
+  }
+
+  @Test
+  public void testMergeAfter() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      c.tableOperations().create(tableName);
+      var tableId = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+      TreeSet<Text> splits = new TreeSet<>();
+      splits.add(new Text(String.format("%09d", 333)));
+      splits.add(new Text(String.format("%09d", 666)));
+      splits.add(new Text(String.format("%09d", 999)));
+
+      var delay = Duration.ofSeconds(5);
+      var startTime = c.instanceOperations().getManagerTime();
+      c.tableOperations().putSplits(tableName, 
TabletMergeabilityUtil.splitsWithDefault(splits,
+          TabletMergeability.after(Duration.ofSeconds(5))));
+
+      Wait.waitFor(() -> countTablets(getCluster().getServerContext(), 
tableName, tm -> true) == 4,
+          5000, 200);
+
+      // Wait for merge back to default tablet
+      Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), 
tableId,
+          Set.of(new KeyExtent(tableId, null, null))), 10000, 200);
+
+      var elapsed = c.instanceOperations().getManagerTime().minus(startTime);
+      assertTrue(elapsed.compareTo(delay) > 0);
+    }
+  }
+
+  @Test
+  public void testMergeabilityMaxFiles() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      Map<String,String> props = new HashMap<>();
+      // disable compactions and set a low merge file max
+      props.put(Property.TABLE_MAJC_RATIO.getKey(), "9999");
+      props.put(Property.TABLE_MERGE_FILE_MAX.getKey(), "3");
+      c.tableOperations().create(tableName, new 
NewTableConfiguration().setProperties(props)
+          .withInitialTabletAvailability(TabletAvailability.HOSTED));
+      var tableId = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+      // Create new tablets that won't merge automatically
+      SortedMap<Text,TabletMergeability> splits = new TreeMap<>();
+      for (int i = 500; i < 5000; i += 500) {
+        splits.put(row(i), TabletMergeability.never());
+      }
+      c.tableOperations().putSplits(tableName, splits);
+
+      // Verify we now have 10 tablets
+      Wait.waitFor(() -> countTablets(getCluster().getServerContext(), 
tableName, tm -> true) == 10,
+          5000, 500);
+
+      // Ingest data so tablet will split, each tablet will have several files 
because
+      // of the flush setting
+      VerifyParams params = new VerifyParams(getClientProps(), tableName, 
5_000);
+      params.startRow = 0;
+      params.flushAfterRows = 100;
+      TestIngest.ingest(c, params);
+      VerifyIngest.verifyIngest(c, params);
+
+      assertTrue(FileMetadataUtil.countFiles(getCluster().getServerContext(), 
tableName) > 3);
+
+      // Mark all tablets as mergeable
+      for (int i = 500; i < 5000; i += 500) {
+        splits.put(row(i), TabletMergeability.always());
+      }
+      c.tableOperations().putSplits(tableName, splits);
+
+      // Should not merge as we set max file count to only 3 and there are 
more files than that
+      // per tablet, so make sure it throws IllegalStateException
+      assertThrows(IllegalStateException.class,
+          () -> Wait.waitFor(() -> 
hasExactTablets(getCluster().getServerContext(), tableId,
+              Set.of(new KeyExtent(tableId, null, null))), 5000, 500));
+      // Make sure tablets is still 10, not merged
+      assertEquals(10, countTablets(getCluster().getServerContext(), 
tableName, tm -> true));
+
+      // Set max merge file count back to default of 10k
+      c.tableOperations().setProperty(tableName, 
Property.TABLE_MERGE_FILE_MAX.getKey(), "10000");
+
+      // Should merge back to 1 tablet
+      Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), 
tableId,
+          Set.of(new KeyExtent(tableId, null, null))), 10000, 200);
+
+      // re-verify the data after merge
+      VerifyIngest.verifyIngest(c, params);
+    }
+  }
+
+  private static boolean hasExactTablets(ServerContext ctx, TableId tableId,
+      Set<KeyExtent> expected) {
+    try (var tabletsMetadata = 
ctx.getAmple().readTablets().forTable(tableId).build()) {
+      // check for exact tablets by counting tablets that match the expected 
rows and also
+      // making sure the number seen equals exactly expected
+      final var expectedTablets = new HashSet<>(expected);
+      for (TabletMetadata tm : tabletsMetadata) {
+        // make sure every tablet seen is contained in the expected set
+        if (!expectedTablets.remove(tm.getExtent())) {
+          return false;
+        }
+      }
+      // Verify all tablets seen
+      return expectedTablets.isEmpty();
+    }
+  }
+
+  private static Text row(int row) {
+    return generateRow(row, 0);
+  }
+}


Reply via email to