This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit f88461430580f39d4868e2f530fececd1e7b7e95
Merge: 5edcccc277 f9897862dd
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Tue Dec 19 07:18:50 2023 -0500

    Merge branch 'main' into elasticity

 core/pom.xml                                       |   4 +-
 .../accumulo/core/conf/ClientConfigGenerate.java   |   8 +-
 .../accumulo/core/conf/ConfigurationDocGen.java    |   4 +-
 .../apache/accumulo/core/logging/TabletLogger.java |   3 +-
 .../accumulo/core/metadata/schema/Ample.java       |   2 -
 .../metadata/schema/TabletMetadataBuilder.java     |   5 -
 .../core/metadata/schema/TabletMutatorBase.java    |  10 +-
 .../accumulo/core/tabletserver/log/LogEntry.java   | 137 +++++++++++++-------
 .../core/metadata/schema/TabletMetadataTest.java   |  11 +-
 .../core/tabletserver/log/LogEntryTest.java        | 139 ++++++++++-----------
 .../org/apache/accumulo/server/fs/VolumeUtil.java  |  21 +---
 .../manager/state/AbstractTabletStateStore.java    |   2 +-
 .../accumulo/server/util/ListVolumesUsed.java      |   2 +-
 .../apache/accumulo/server/fs/VolumeUtilTest.java  |  12 +-
 .../server/manager/state/TabletManagementTest.java |   4 +-
 .../accumulo/gc/GarbageCollectWriteAheadLogs.java  |   2 +-
 .../accumulo/manager/recovery/RecoveryManager.java |   2 +-
 .../accumulo/tserver/TabletClientHandler.java      |   8 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  17 +--
 .../org/apache/accumulo/tserver/log/DfsLogger.java |  91 +++++---------
 .../accumulo/tserver/log/TabletServerLogger.java   |  33 +++--
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  39 +++---
 .../accumulo/tserver/WalRemovalOrderTest.java      |  43 ++++---
 .../shell/commands/ListTabletsCommandTest.java     |   4 +-
 .../test/MissingWalHeaderCompletesRecoveryIT.java  |   4 +-
 .../test/functional/AmpleConditionalWriterIT.java  |  12 +-
 .../functional/TabletManagementIteratorIT.java     |   2 +-
 27 files changed, 294 insertions(+), 327 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
index da10ce2676,44832db7f1..0bdf7e2055
--- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
@@@ -23,7 -23,7 +23,8 @@@ import static java.util.stream.Collecto
  import java.util.Collection;
  import java.util.List;
  import java.util.Optional;
 +import java.util.SortedSet;
+ import java.util.UUID;
  import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.client.admin.CompactionConfig;
@@@ -161,9 -160,9 +162,9 @@@ public class TabletLogger 
      fileLog.debug("Imported {} {}  ", extent, file);
    }
  
 -  public static void recovering(KeyExtent extent, List<LogEntry> logEntries) {
 +  public static void recovering(KeyExtent extent, Collection<LogEntry> 
logEntries) {
      if (recoveryLog.isDebugEnabled()) {
-       List<String> logIds = 
logEntries.stream().map(LogEntry::getUniqueID).collect(toList());
+       List<UUID> logIds = 
logEntries.stream().map(LogEntry::getUniqueID).collect(toList());
        recoveryLog.debug("For {} recovering data from walogs: {}", extent, 
logIds);
      }
    }
diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index d8bb5d62e9,d551b342af..55abba98b1
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@@ -344,81 -265,50 +344,79 @@@ public interface Ample 
    /**
     * Interface for changing a tablets persistent data.
     */
 -  interface TabletMutator {
 -    TabletMutator putPrevEndRow(Text per);
 +  interface TabletUpdates<T> {
 +    T putPrevEndRow(Text per);
 +
 +    T putFile(ReferencedTabletFile path, DataFileValue dfv);
 +
 +    T putFile(StoredTabletFile path, DataFileValue dfv);
 +
 +    T deleteFile(StoredTabletFile path);
 +
 +    T putScan(StoredTabletFile path);
 +
 +    T deleteScan(StoredTabletFile path);
  
 -    TabletMutator putFile(ReferencedTabletFile path, DataFileValue dfv);
 +    T putFlushId(long flushId);
  
 -    TabletMutator putFile(StoredTabletFile path, DataFileValue dfv);
 +    T putLocation(Location location);
  
 -    TabletMutator deleteFile(StoredTabletFile path);
 +    T deleteLocation(Location location);
  
 -    TabletMutator putScan(StoredTabletFile path);
 +    T putZooLock(String zookeeperRoot, ServiceLock zooLock);
  
 -    TabletMutator deleteScan(StoredTabletFile path);
 +    T putDirName(String dirName);
  
 -    TabletMutator putCompactionId(long compactionId);
 +    T putWal(LogEntry logEntry);
  
-     T deleteWal(String wal);
- 
 -    TabletMutator putFlushId(long flushId);
 +    T deleteWal(LogEntry logEntry);
  
 -    TabletMutator putLocation(Location location);
 +    T putTime(MetadataTime time);
  
 -    TabletMutator deleteLocation(Location location);
 +    T putBulkFile(ReferencedTabletFile bulkref, long tid);
  
 -    TabletMutator putZooLock(ServiceLock zooLock);
 +    T deleteBulkFile(StoredTabletFile bulkref);
  
 -    TabletMutator putDirName(String dirName);
 +    T putSuspension(TServerInstance tserver, long suspensionTime);
  
 -    TabletMutator putWal(LogEntry logEntry);
 +    T deleteSuspension();
  
 -    TabletMutator deleteWal(LogEntry wal);
 +    T putExternalCompaction(ExternalCompactionId ecid, 
ExternalCompactionMetadata ecMeta);
  
 -    TabletMutator putTime(MetadataTime time);
 +    T deleteExternalCompaction(ExternalCompactionId ecid);
  
 -    TabletMutator putBulkFile(ReferencedTabletFile bulkref, long tid);
 +    T putCompacted(long fateTxid);
  
 -    TabletMutator deleteBulkFile(StoredTabletFile bulkref);
 +    T deleteCompacted(long fateTxid);
  
 -    TabletMutator putSuspension(TServerInstance tserver, long suspensionTime);
 +    T putHostingGoal(TabletHostingGoal goal);
  
 -    TabletMutator deleteSuspension();
 +    T setHostingRequested();
  
 -    TabletMutator putExternalCompaction(ExternalCompactionId ecid,
 -        ExternalCompactionMetadata ecMeta);
 +    T deleteHostingRequested();
  
 -    TabletMutator deleteExternalCompaction(ExternalCompactionId ecid);
 +    T putOperation(TabletOperationId opId);
 +
 +    T deleteOperation();
 +
 +    T putSelectedFiles(SelectedFiles selectedFiles);
 +
 +    T deleteSelectedFiles();
  
 +    /**
 +     * Deletes all the columns in the keys.
 +     *
 +     * @throws IllegalArgumentException if rows in keys do not match tablet 
row or column visibility
 +     *         is not empty
 +     */
 +    T deleteAll(Set<Key> keys);
 +
 +    T setMerged();
 +
 +    T deleteMerged();
 +  }
 +
 +  interface TabletMutator extends TabletUpdates<TabletMutator> {
      /**
       * This method persist (or queues for persisting) previous put and 
deletes against this object.
       * Unless this method is called, previous calls will never be persisted. 
The purpose of this
diff --cc 
core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java
index f2bd30f15e,0000000000..7d102ea7ef
mode 100644,000000..100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java
@@@ -1,303 -1,0 +1,298 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.metadata.schema;
 +
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_GOAL;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_REQUESTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
 +
 +import java.util.Arrays;
 +import java.util.EnumSet;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +
 +import org.apache.accumulo.core.client.admin.TabletHostingGoal;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
 +import org.apache.hadoop.io.Text;
 +
 +public class TabletMetadataBuilder implements 
Ample.TabletUpdates<TabletMetadataBuilder> {
 +
 +  public static class InternalBuilder extends 
TabletMutatorBase<InternalBuilder> {
 +    protected InternalBuilder(KeyExtent extent) {
 +      super(extent);
 +    }
 +
 +    @Override
 +    public Mutation getMutation() {
 +      return super.getMutation();
 +    }
 +  }
 +
 +  private final InternalBuilder internalBuilder;
 +  EnumSet<TabletMetadata.ColumnType> fetched;
 +
 +  protected TabletMetadataBuilder(KeyExtent extent) {
 +    internalBuilder = new InternalBuilder(extent);
 +    fetched = EnumSet.noneOf(TabletMetadata.ColumnType.class);
 +    putPrevEndRow(extent.prevEndRow());
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putPrevEndRow(Text per) {
 +    fetched.add(PREV_ROW);
 +    internalBuilder.putPrevEndRow(per);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putFile(ReferencedTabletFile path, 
DataFileValue dfv) {
 +    fetched.add(FILES);
 +    internalBuilder.putFile(path, dfv);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putFile(StoredTabletFile path, DataFileValue 
dfv) {
 +    fetched.add(FILES);
 +    internalBuilder.putFile(path, dfv);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteFile(StoredTabletFile path) {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putScan(StoredTabletFile path) {
 +    fetched.add(SCANS);
 +    internalBuilder.putScan(path);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteScan(StoredTabletFile path) {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putFlushId(long flushId) {
 +    fetched.add(FLUSH_ID);
 +    internalBuilder.putFlushId(flushId);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putLocation(TabletMetadata.Location location) {
 +    fetched.add(LOCATION);
 +    internalBuilder.putLocation(location);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteLocation(TabletMetadata.Location 
location) {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putZooLock(String zookeeperRoot, ServiceLock 
zooLock) {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putDirName(String dirName) {
 +    fetched.add(DIR);
 +    internalBuilder.putDirName(dirName);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putWal(LogEntry logEntry) {
 +    fetched.add(LOGS);
 +    internalBuilder.putWal(logEntry);
 +    return this;
 +  }
 +
-   @Override
-   public TabletMetadataBuilder deleteWal(String wal) {
-     throw new UnsupportedOperationException();
-   }
- 
 +  @Override
 +  public TabletMetadataBuilder deleteWal(LogEntry logEntry) {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putTime(MetadataTime time) {
 +    fetched.add(TIME);
 +    internalBuilder.putTime(time);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putBulkFile(ReferencedTabletFile bulkref, long 
tid) {
 +    fetched.add(LOADED);
 +    internalBuilder.putBulkFile(bulkref, tid);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteBulkFile(StoredTabletFile bulkref) {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putSuspension(TServerInstance tserver, long 
suspensionTime) {
 +    fetched.add(SUSPEND);
 +    internalBuilder.putSuspension(tserver, suspensionTime);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteSuspension() {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putExternalCompaction(ExternalCompactionId 
ecid,
 +      ExternalCompactionMetadata ecMeta) {
 +    fetched.add(ECOMP);
 +    internalBuilder.putExternalCompaction(ecid, ecMeta);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteExternalCompaction(ExternalCompactionId 
ecid) {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putCompacted(long fateTxId) {
 +    fetched.add(COMPACTED);
 +    internalBuilder.putCompacted(fateTxId);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteCompacted(long fateTxId) {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putHostingGoal(TabletHostingGoal goal) {
 +    fetched.add(HOSTING_GOAL);
 +    internalBuilder.putHostingGoal(goal);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder setHostingRequested() {
 +    fetched.add(HOSTING_REQUESTED);
 +    internalBuilder.setHostingRequested();
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteHostingRequested() {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putOperation(TabletOperationId opId) {
 +    fetched.add(OPID);
 +    internalBuilder.putOperation(opId);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteOperation() {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder putSelectedFiles(SelectedFiles selectedFiles) {
 +    fetched.add(SELECTED);
 +    internalBuilder.putSelectedFiles(selectedFiles);
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteSelectedFiles() {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteAll(Set<Key> keys) {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder setMerged() {
 +    fetched.add(MERGED);
 +    internalBuilder.setMerged();
 +    return this;
 +  }
 +
 +  @Override
 +  public TabletMetadataBuilder deleteMerged() {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  /**
 +   * @param extraFetched Anything that was put on the builder will 
automatically be added to the
 +   *        fetched set. However, for the case where something was not put 
and it needs to be
 +   *        fetched it can be passed here. For example to simulate a tablet 
w/o a location it, no
 +   *        location will be put and LOCATION would be passed in via this 
argument.
 +   */
 +  public TabletMetadata build(TabletMetadata.ColumnType... extraFetched) {
 +    var mutation = internalBuilder.getMutation();
 +
 +    SortedMap<Key,Value> rowMap = new TreeMap<>();
 +    mutation.getUpdates().forEach(cu -> {
 +      Key k = new Key(mutation.getRow(), cu.getColumnFamily(), 
cu.getColumnQualifier(),
 +          cu.getTimestamp());
 +      Value v = new Value(cu.getValue());
 +      rowMap.put(k, v);
 +    });
 +
 +    fetched.addAll(Arrays.asList(extraFetched));
 +
 +    return TabletMetadata.convertRow(rowMap.entrySet().iterator(), fetched, 
true, false);
 +  }
 +
 +}
diff --cc 
core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
index 2792397ab7,036c9ac342..a0b83c00a8
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
@@@ -40,10 -37,7 +40,9 @@@ import org.apache.accumulo.core.metadat
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
- import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
@@@ -186,17 -179,10 +185,10 @@@ public abstract class TabletMutatorBase
    }
  
    @Override
 -  public Ample.TabletMutator deleteWal(LogEntry logEntry) {
 +  public T deleteWal(LogEntry logEntry) {
      Preconditions.checkState(updatesEnabled, "Cannot make updates after 
calling mutate.");
-     mutation.putDelete(LogColumnFamily.NAME, logEntry.getColumnQualifier());
-     return getThis();
-   }
- 
-   @Override
-   public T deleteWal(String wal) {
-     Preconditions.checkState(updatesEnabled, "Cannot make updates after 
calling mutate.");
-     mutation.putDelete(LogColumnFamily.STR_NAME, wal);
+     logEntry.deleteFromMutation(mutation);
 -    return this;
 +    return getThis();
    }
  
    @Override
diff --cc 
core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
index 73a9ecf979,dea6460eaf..3343d750fc
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
@@@ -106,24 -147,33 +147,32 @@@ public final class LogEntry 
  
    @Override
    public int hashCode() {
-     return Objects.hash(filePath);
+     return Objects.hash(path);
    }
  
-   public static LogEntry fromMetaWalEntry(Entry<Key,Value> entry) {
-     String qualifier = entry.getKey().getColumnQualifier().toString();
-     String[] parts = qualifier.split("/", 2);
-     Preconditions.checkArgument(parts.length == 2 && parts[0].equals("-"),
-         "Malformed write-ahead log %s", qualifier);
-     return new LogEntry(parts[1]);
+   /**
+    * Get the Text that should be used as the column qualifier to store this 
as a metadata entry.
+    */
 -  @VisibleForTesting
 -  Text getColumnQualifier() {
++  public Text getColumnQualifier() {
+     return new Text("-/" + getPath());
    }
  
-   public String getUniqueID() {
-     String[] parts = filePath.split("/");
-     return parts[parts.length - 1];
+   /**
+    * Put a delete marker in the provided mutation for this LogEntry.
+    *
+    * @param mutation the mutation to update
+    */
+   public void deleteFromMutation(Mutation mutation) {
+     mutation.putDelete(LogColumnFamily.NAME, getColumnQualifier());
    }
  
-   public Text getColumnQualifier() {
-     return new Text("-/" + filePath);
+   /**
+    * Put this LogEntry into the provided mutation.
+    *
+    * @param mutation the mutation to update
+    */
+   public void addToMutation(Mutation mutation) {
+     mutation.put(LogColumnFamily.NAME, getColumnQualifier(), new Value());
    }
  
  }
diff --cc 
core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
index 2cbc25f57c,5287d689ba..a5f47e6a20
--- 
a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
@@@ -318,97 -293,4 +317,97 @@@ public class TabletMetadataTest 
      });
      return rowMap;
    }
 +
 +  @Test
 +  public void testBuilder() {
 +    TServerInstance ser1 = new 
TServerInstance(HostAndPort.fromParts("server1", 8555), "s001");
 +
 +    KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new 
Text("da"));
 +
 +    StoredTabletFile sf1 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")).insert();
 +    DataFileValue dfv1 = new DataFileValue(89, 67);
 +
 +    StoredTabletFile sf2 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/sf2.rf")).insert();
 +    DataFileValue dfv2 = new DataFileValue(890, 670);
 +
 +    ReferencedTabletFile rf1 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/imp1.rf"));
 +    ReferencedTabletFile rf2 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/imp2.rf"));
 +
 +    StoredTabletFile sf3 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/sf3.rf")).insert();
 +    StoredTabletFile sf4 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/sf4.rf")).insert();
 +
 +    TabletMetadata tm = 
TabletMetadata.builder(extent).putHostingGoal(TabletHostingGoal.NEVER)
 +        .putLocation(Location.future(ser1)).putFile(sf1, dfv1).putFile(sf2, 
dfv2)
 +        .putBulkFile(rf1, 25).putBulkFile(rf2, 
35).putFlushId(27).putDirName("dir1").putScan(sf3)
 +        .putScan(sf4).putCompacted(17).putCompacted(23).build(ECOMP, 
HOSTING_REQUESTED, MERGED);
 +
 +    assertEquals(extent, tm.getExtent());
 +    assertEquals(TabletHostingGoal.NEVER, tm.getHostingGoal());
 +    assertEquals(Location.future(ser1), tm.getLocation());
 +    assertEquals(27L, tm.getFlushId().orElse(-1));
 +    assertEquals(Map.of(sf1, dfv1, sf2, dfv2), tm.getFilesMap());
 +    assertEquals(Map.of(rf1.insert(), 25L, rf2.insert(), 35L), 
tm.getLoaded());
 +    assertEquals("dir1", tm.getDirName());
 +    assertEquals(Set.of(sf3, sf4), Set.copyOf(tm.getScans()));
 +    assertEquals(Set.of(), tm.getExternalCompactions().keySet());
 +    assertEquals(Set.of(17L, 23L), tm.getCompacted());
 +    assertFalse(tm.getHostingRequested());
 +    assertFalse(tm.hasMerged());
 +    assertThrows(IllegalStateException.class, tm::getOperationId);
 +    assertThrows(IllegalStateException.class, tm::getSuspend);
 +    assertThrows(IllegalStateException.class, tm::getTime);
 +
 +    TabletOperationId opid1 = 
TabletOperationId.from(TabletOperationType.SPLITTING, 55);
 +    TabletMetadata tm2 = 
TabletMetadata.builder(extent).putOperation(opid1).build(LOCATION);
 +
 +    assertEquals(extent, tm2.getExtent());
 +    assertEquals(opid1, tm2.getOperationId());
 +    assertNull(tm2.getLocation());
 +    assertThrows(IllegalStateException.class, tm2::getFiles);
 +    assertThrows(IllegalStateException.class, tm2::getHostingGoal);
 +    assertThrows(IllegalStateException.class, tm2::getFlushId);
 +    assertThrows(IllegalStateException.class, tm2::getFiles);
 +    assertThrows(IllegalStateException.class, tm2::getLogs);
 +    assertThrows(IllegalStateException.class, tm2::getLoaded);
 +    assertThrows(IllegalStateException.class, tm2::getDirName);
 +    assertThrows(IllegalStateException.class, tm2::getScans);
 +    assertThrows(IllegalStateException.class, tm2::getExternalCompactions);
 +    assertThrows(IllegalStateException.class, tm2::getHostingRequested);
 +    assertThrows(IllegalStateException.class, tm2::getSelectedFiles);
 +    assertThrows(IllegalStateException.class, tm2::getCompacted);
 +
 +    var ecid1 = ExternalCompactionId.generate(UUID.randomUUID());
 +    ExternalCompactionMetadata ecm = new 
ExternalCompactionMetadata(Set.of(sf1, sf2), rf1, "cid1",
 +        CompactionKind.USER, (short) 3, 
CompactionExecutorIdImpl.externalId("Q1"), true, 99L);
 +
-     LogEntry le1 = new LogEntry("localhost:8020/" + UUID.randomUUID());
-     LogEntry le2 = new LogEntry("localhost:8020/" + UUID.randomUUID());
++    LogEntry le1 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID());
++    LogEntry le2 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID());
 +
 +    SelectedFiles selFiles = new SelectedFiles(Set.of(sf1, sf4), false, 159L);
 +
 +    TabletMetadata tm3 = 
TabletMetadata.builder(extent).putExternalCompaction(ecid1, ecm)
 +        .putSuspension(ser1, 45L).putTime(new MetadataTime(479, 
TimeType.LOGICAL)).putWal(le1)
 +        
.putWal(le2).setHostingRequested().putSelectedFiles(selFiles).setMerged().build();
 +
 +    assertEquals(Set.of(ecid1), tm3.getExternalCompactions().keySet());
 +    assertEquals(Set.of(sf1, sf2), 
tm3.getExternalCompactions().get(ecid1).getJobFiles());
 +    assertEquals(ser1.getHostAndPort(), tm3.getSuspend().server);
 +    assertEquals(45L, tm3.getSuspend().suspensionTime);
 +    assertEquals(new MetadataTime(479, TimeType.LOGICAL), tm3.getTime());
 +    assertTrue(tm3.getHostingRequested());
 +    assertEquals(Stream.of(le1, le2).map(LogEntry::toString).collect(toSet()),
 +        tm3.getLogs().stream().map(LogEntry::toString).collect(toSet()));
 +    assertEquals(Set.of(sf1, sf4), tm3.getSelectedFiles().getFiles());
 +    assertEquals(159L, tm3.getSelectedFiles().getFateTxId());
 +    assertFalse(tm3.getSelectedFiles().initiallySelectedAll());
 +    assertEquals(selFiles.getMetadataValue(), 
tm3.getSelectedFiles().getMetadataValue());
 +    assertTrue(tm3.hasMerged());
 +  }
 +
  }
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index 140f23cae5,19841ac7a3..b219a73ffa
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@@ -130,59 -117,37 +119,59 @@@ public class VolumeUtil 
      }
    }
  
 -  /**
 -   * This method does two things. First, it switches any volumes a tablet is 
using that are
 -   * configured in instance.volumes.replacements. Second, if a tablet dir is 
no longer configured
 -   * for use it chooses a new tablet directory.
 -   */
 -  public static TabletFiles updateTabletVolumes(ServerContext context, 
ServiceLock zooLock,
 -      KeyExtent extent, TabletFiles tabletFiles) {
 -    List<Pair<Path,Path>> replacements = context.getVolumeReplacements();
 +  public static boolean needsVolumeReplacement(final List<Pair<Path,Path>> 
replacements,
 +      final TabletMetadata tm) {
      if (replacements.isEmpty()) {
 -      return tabletFiles;
 +      return false;
      }
 -    log.trace("Using volume replacements: {}", replacements);
  
 -    List<LogEntry> logsToRemove = new ArrayList<>();
 -    List<LogEntry> logsToAdd = new ArrayList<>();
 +    MutableBoolean needsReplacement = new MutableBoolean(false);
 +
 +    Consumer<LogEntry> consumer = le -> needsReplacement.setTrue();
 +
 +    volumeReplacementEvaluation(replacements, tm, consumer, consumer,
 +        f -> needsReplacement.setTrue(), (f, dfv) -> 
needsReplacement.setTrue());
 +
 +    return needsReplacement.booleanValue();
 +  }
  
 -    List<StoredTabletFile> filesToRemove = new ArrayList<>();
 -    SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd = new 
TreeMap<>();
 +  public static class VolumeReplacements {
 +    public final TabletMetadata tabletMeta;
 +    public final List<LogEntry> logsToRemove = new ArrayList<>();
 +    public final List<LogEntry> logsToAdd = new ArrayList<>();
 +    public final List<StoredTabletFile> filesToRemove = new ArrayList<>();
 +    public final Map<ReferencedTabletFile,DataFileValue> filesToAdd = new 
HashMap<>();
  
 -    TabletFiles ret = new TabletFiles();
 +    public VolumeReplacements(TabletMetadata tabletMeta) {
 +      this.tabletMeta = tabletMeta;
 +    }
 +  }
  
 -    for (LogEntry logEntry : tabletFiles.logEntries) {
 +  public static VolumeReplacements
 +      computeVolumeReplacements(final List<Pair<Path,Path>> replacements, 
final TabletMetadata tm) {
 +    var vr = new VolumeReplacements(tm);
 +    volumeReplacementEvaluation(replacements, tm, vr.logsToRemove::add, 
vr.logsToAdd::add,
 +        vr.filesToRemove::add, vr.filesToAdd::put);
 +    return vr;
 +  }
 +
 +  public static void volumeReplacementEvaluation(final List<Pair<Path,Path>> 
replacements,
 +      final TabletMetadata tm, final Consumer<LogEntry> logsToRemove,
 +      final Consumer<LogEntry> logsToAdd, final Consumer<StoredTabletFile> 
filesToRemove,
 +      final BiConsumer<ReferencedTabletFile,DataFileValue> filesToAdd) {
 +    if (replacements.isEmpty() || (tm.getFilesMap().isEmpty() && 
tm.getLogs().isEmpty())) {
 +      return;
 +    }
 +
 +    log.trace("Using volume replacements: {}", replacements);
 +    for (LogEntry logEntry : tm.getLogs()) {
 +      log.trace("Evaluating walog {} for replacement.", logEntry);
        LogEntry switchedLogEntry = switchVolumes(logEntry, replacements);
        if (switchedLogEntry != null) {
 -        logsToRemove.add(logEntry);
 -        logsToAdd.add(switchedLogEntry);
 -        ret.logEntries.add(switchedLogEntry);
 -        log.debug("Replacing volume {} : {} -> {}", extent, 
logEntry.getPath(),
 +        logsToRemove.accept(logEntry);
 +        logsToAdd.accept(switchedLogEntry);
-         log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), 
logEntry.getFilePath(),
-             switchedLogEntry.getFilePath());
++        log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), 
logEntry.getPath(),
+             switchedLogEntry.getPath());
 -      } else {
 -        ret.logEntries.add(logEntry);
        }
      }
  
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
index 526e636f32,0000000000..7d2a09f686
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
@@@ -1,181 -1,0 +1,181 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.manager.state;
 +
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import 
org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
 +import org.apache.hadoop.fs.Path;
 +
 +import com.google.common.base.Preconditions;
 +
 +public abstract class AbstractTabletStateStore implements TabletStateStore {
 +
 +  private final Ample ample;
 +
 +  protected AbstractTabletStateStore(ClientContext context) {
 +    this.ample = context.getAmple();
 +  }
 +
 +  @Override
 +  public void setLocations(Collection<Assignment> assignments) throws 
DistributedStoreException {
 +    try (var tabletsMutator = ample.conditionallyMutateTablets()) {
 +      for (Assignment assignment : assignments) {
 +        var conditionalMutator = 
tabletsMutator.mutateTablet(assignment.tablet)
 +            
.requireLocation(TabletMetadata.Location.future(assignment.server))
 +            .putLocation(TabletMetadata.Location.current(assignment.server))
 +            
.deleteLocation(TabletMetadata.Location.future(assignment.server)).deleteSuspension();
 +
 +        updateLastLocation(conditionalMutator, assignment.server, 
assignment.lastLocation);
 +
 +        conditionalMutator.submit(tabletMetadata -> {
 +          
Preconditions.checkArgument(tabletMetadata.getExtent().equals(assignment.tablet));
 +          return tabletMetadata.getLocation() != null && 
tabletMetadata.getLocation()
 +              .equals(TabletMetadata.Location.current(assignment.server));
 +        });
 +      }
 +
 +      if (tabletsMutator.process().values().stream()
 +          .anyMatch(result -> result.getStatus() != Status.ACCEPTED)) {
 +        throw new DistributedStoreException(
 +            "failed to set tablet location, conditional mutation failed");
 +      }
 +    } catch (RuntimeException ex) {
 +      throw new DistributedStoreException(ex);
 +    }
 +  }
 +
 +  @Override
 +  public void setFutureLocations(Collection<Assignment> assignments)
 +      throws DistributedStoreException {
 +    try (var tabletsMutator = ample.conditionallyMutateTablets()) {
 +      for (Assignment assignment : assignments) {
 +        
tabletsMutator.mutateTablet(assignment.tablet).requireAbsentOperation()
 +            .requireAbsentLocation().deleteSuspension()
 +            .putLocation(TabletMetadata.Location.future(assignment.server))
 +            .submit(tabletMetadata -> {
 +              
Preconditions.checkArgument(tabletMetadata.getExtent().equals(assignment.tablet));
 +              return tabletMetadata.getLocation() != null && 
tabletMetadata.getLocation()
 +                  .equals(TabletMetadata.Location.future(assignment.server));
 +            });
 +      }
 +
 +      var results = tabletsMutator.process();
 +
 +      if (results.values().stream().anyMatch(result -> result.getStatus() != 
Status.ACCEPTED)) {
 +        throw new DistributedStoreException(
 +            "failed to set tablet location, conditional mutation failed. ");
 +      }
 +
 +    } catch (RuntimeException ex) {
 +      throw new DistributedStoreException(ex);
 +    }
 +  }
 +
 +  @Override
 +  public void unassign(Collection<TabletMetadata> tablets,
 +      Map<TServerInstance,List<Path>> logsForDeadServers) throws 
DistributedStoreException {
 +    unassign(tablets, logsForDeadServers, -1);
 +  }
 +
 +  @Override
 +  public void suspend(Collection<TabletMetadata> tablets,
 +      Map<TServerInstance,List<Path>> logsForDeadServers, long 
suspensionTimestamp)
 +      throws DistributedStoreException {
 +    unassign(tablets, logsForDeadServers, suspensionTimestamp);
 +  }
 +
 +  protected abstract void processSuspension(Ample.ConditionalTabletMutator 
tabletMutator,
 +      TabletMetadata tm, long suspensionTimestamp);
 +
 +  private void unassign(Collection<TabletMetadata> tablets,
 +      Map<TServerInstance,List<Path>> logsForDeadServers, long 
suspensionTimestamp)
 +      throws DistributedStoreException {
 +    try (var tabletsMutator = ample.conditionallyMutateTablets()) {
 +      for (TabletMetadata tm : tablets) {
 +        if (tm.getLocation() == null) {
 +          continue;
 +        }
 +
 +        var tabletMutator =
 +            
tabletsMutator.mutateTablet(tm.getExtent()).requireLocation(tm.getLocation());
 +
 +        if (tm.hasCurrent()) {
 +
 +          updateLastLocation(tabletMutator, 
tm.getLocation().getServerInstance(), tm.getLast());
 +          tabletMutator.deleteLocation(tm.getLocation());
 +          if (logsForDeadServers != null) {
 +            List<Path> logs = 
logsForDeadServers.get(tm.getLocation().getServerInstance());
 +            if (logs != null) {
 +              for (Path log : logs) {
-                 LogEntry entry = new LogEntry(log.toString());
++                LogEntry entry = LogEntry.fromPath(log.toString());
 +                tabletMutator.putWal(entry);
 +              }
 +            }
 +          }
 +        }
 +
 +        if (tm.getLocation() != null && tm.getLocation().getType() != null
 +            && tm.getLocation().getType().equals(LocationType.FUTURE)) {
 +          tabletMutator.deleteLocation(tm.getLocation());
 +        }
 +
 +        processSuspension(tabletMutator, tm, suspensionTimestamp);
 +
 +        tabletMutator.submit(tabletMetadata -> tabletMetadata.getLocation() 
== null);
 +      }
 +
 +      Map<KeyExtent,Ample.ConditionalResult> results = 
tabletsMutator.process();
 +
 +      if (results.values().stream()
 +          .anyMatch(conditionalResult -> conditionalResult.getStatus() != 
Status.ACCEPTED)) {
 +        throw new DistributedStoreException("Some unassignments did not 
satisfy conditions.");
 +      }
 +
 +    } catch (RuntimeException ex) {
 +      throw new DistributedStoreException(ex);
 +    }
 +  }
 +
 +  protected static void updateLastLocation(Ample.TabletUpdates<?> 
tabletMutator,
 +      TServerInstance location, Location lastLocation) {
 +    Preconditions.checkArgument(
 +        lastLocation == null || lastLocation.getType() == 
TabletMetadata.LocationType.LAST);
 +    Location newLocation = Location.last(location);
 +    if (lastLocation != null) {
 +      if (!lastLocation.equals(newLocation)) {
 +        tabletMutator.deleteLocation(lastLocation);
 +        tabletMutator.putLocation(newLocation);
 +      }
 +    } else {
 +      tabletMutator.putLocation(newLocation);
 +    }
 +  }
 +
 +}
diff --cc 
server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java
index fafa79f696,0000000000..caa9c926f8
mode 100644,000000..100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java
@@@ -1,188 -1,0 +1,188 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.manager.state;
 +
 +import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
 +import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
 +import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +
 +import java.util.ArrayList;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +import java.util.UUID;
 +
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.fate.FateTxId;
 +import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
 +import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.junit.jupiter.api.Test;
 +
 +public class TabletManagementTest {
 +
 +  private SortedMap<Key,Value> toRowMap(Mutation mutation) {
 +    SortedMap<Key,Value> rowMap = new TreeMap<>();
 +    mutation.getUpdates().forEach(cu -> {
 +      Key k = new Key(mutation.getRow(), cu.getColumnFamily(), 
cu.getColumnQualifier(),
 +          cu.getTimestamp());
 +      Value v = new Value(cu.getValue());
 +      rowMap.put(k, v);
 +    });
 +    return rowMap;
 +  }
 +
 +  private SortedMap<Key,Value> createMetadataEntryKV(KeyExtent extent) {
 +
 +    Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent);
 +
 +    DIRECTORY_COLUMN.put(mutation, new Value("t-0001757"));
 +    FLUSH_COLUMN.put(mutation, new Value("6"));
 +    TIME_COLUMN.put(mutation, new Value("M123456789"));
 +
 +    StoredTabletFile bf1 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/bf1")).insert();
 +    StoredTabletFile bf2 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/bf2")).insert();
 +    
mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf1.getMetadata())
 +        .put(FateTxId.formatTid(56));
 +    
mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf2.getMetadata())
 +        .put(FateTxId.formatTid(59));
 +
 +    mutation.at().family(ClonedColumnFamily.NAME).qualifier("").put("OK");
 +
 +    DataFileValue dfv1 = new DataFileValue(555, 23);
 +    StoredTabletFile tf1 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/df1.rf")).insert();
 +    StoredTabletFile tf2 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/df2.rf")).insert();
 +    
mutation.at().family(DataFileColumnFamily.NAME).qualifier(tf1.getMetadata()).put(dfv1.encode());
 +    DataFileValue dfv2 = new DataFileValue(234, 13);
 +    
mutation.at().family(DataFileColumnFamily.NAME).qualifier(tf2.getMetadata()).put(dfv2.encode());
 +
 +    
mutation.at().family(CurrentLocationColumnFamily.NAME).qualifier("s001").put("server1:8555");
 +
 +    
mutation.at().family(LastLocationColumnFamily.NAME).qualifier("s000").put("server2:8555");
 +
-     LogEntry le1 = new LogEntry("localhost:8020/" + UUID.randomUUID());
++    LogEntry le1 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID());
 +    le1.addToMutation(mutation);
-     LogEntry le2 = new LogEntry("localhost:8020/" + UUID.randomUUID());
++    LogEntry le2 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID());
 +    le2.addToMutation(mutation);
 +
 +    StoredTabletFile sf1 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")).insert();
 +    StoredTabletFile sf2 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/sf2.rf")).insert();
 +    
mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf1.getMetadata()).put("");
 +    
mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf2.getMetadata()).put("");
 +
 +    return toRowMap(mutation);
 +
 +  }
 +
 +  @Test
 +  public void testEncodeDecodeWithReasons() throws Exception {
 +    KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new 
Text("da"));
 +
 +    final Set<ManagementAction> actions =
 +        Set.of(ManagementAction.NEEDS_LOCATION_UPDATE, 
ManagementAction.NEEDS_SPLITTING);
 +
 +    final SortedMap<Key,Value> entries = createMetadataEntryKV(extent);
 +
 +    TabletManagement.addActions(entries, actions);
 +    Key key = entries.firstKey();
 +    Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()),
 +        new ArrayList<>(entries.values()));
 +
 +    // Remove the REASONS column from the entries map for the comparison check
 +    // below
 +    entries.remove(new Key(key.getRow().toString(), "REASONS", ""));
 +
 +    TabletManagement tmi = new TabletManagement(key, val, true);
 +    assertEquals(entries, tmi.getTabletMetadata().getKeyValues());
 +    assertEquals(actions, tmi.getActions());
 +  }
 +
 +  @Test
 +  public void testEncodeDecodeWithErrors() throws Exception {
 +    KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new 
Text("da"));
 +
 +    final SortedMap<Key,Value> entries = createMetadataEntryKV(extent);
 +
 +    TabletManagement.addError(entries, new UnsupportedOperationException("Not 
supported."));
 +    Key key = entries.firstKey();
 +    Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()),
 +        new ArrayList<>(entries.values()));
 +
 +    // Remove the ERROR column from the entries map for the comparison check
 +    // below
 +    entries.remove(new Key(key.getRow().toString(), "ERROR", ""));
 +
 +    TabletManagement tmi = new TabletManagement(key, val, true);
 +    assertEquals(entries, tmi.getTabletMetadata().getKeyValues());
 +    assertEquals("Not supported.", tmi.getErrorMessage());
 +  }
 +
 +  @Test
 +  public void testBinary() throws Exception {
 +    // test end row with non ascii data
 +    Text endRow = new Text(new byte[] {'m', (byte) 0xff});
 +    KeyExtent extent = new KeyExtent(TableId.of("5"), endRow, new Text("da"));
 +
 +    final Set<ManagementAction> actions =
 +        Set.of(ManagementAction.NEEDS_LOCATION_UPDATE, 
ManagementAction.NEEDS_SPLITTING);
 +
 +    final SortedMap<Key,Value> entries = createMetadataEntryKV(extent);
 +
 +    TabletManagement.addActions(entries, actions);
 +    Key key = entries.firstKey();
 +    Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()),
 +        new ArrayList<>(entries.values()));
 +
 +    assertTrue(entries.keySet().stream().allMatch(k -> 
k.getRow().equals(extent.toMetaRow())));
 +
 +    // Remove the REASONS column from the entries map for the comparison check
 +    // below
 +    entries.remove(new Key(key.getRow(), new Text("REASONS"), new Text("")));
 +
 +    TabletManagement tmi = new TabletManagement(key, val, true);
 +    assertEquals(entries, tmi.getTabletMetadata().getKeyValues());
 +    assertEquals(actions, tmi.getActions());
 +
 +  }
 +}
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 493b64afb4,807ba9f690..4b4d35918b
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -294,15 -286,16 +294,15 @@@ public class GarbageCollectWriteAheadLo
        }
        // Tablet is being recovered and has WAL references, remove all the 
WALs for the dead server
        // that made the WALs.
 -      for (Collection<String> wals : state.walogs) {
 -        for (String wal : wals) {
 -          UUID walUUID = path2uuid(new Path(wal));
 -          TServerInstance dead = result.get(walUUID);
 -          // There's a reference to a log file, so skip that server's logs
 -          Set<UUID> idsToIgnore = candidates.remove(dead);
 -          if (idsToIgnore != null) {
 -            result.keySet().removeAll(idsToIgnore);
 -            recoveryLogs.keySet().removeAll(idsToIgnore);
 -          }
 +      for (LogEntry wals : tabletMetadata.getLogs()) {
-         String wal = wals.getFilePath();
++        String wal = wals.getPath();
 +        UUID walUUID = path2uuid(new Path(wal));
 +        TServerInstance dead = result.get(walUUID);
 +        // There's a reference to a log file, so skip that server's logs
 +        Set<UUID> idsToIgnore = candidates.remove(dead);
 +        if (idsToIgnore != null) {
 +          result.keySet().removeAll(idsToIgnore);
 +          recoveryLogs.keySet().removeAll(idsToIgnore);
          }
        }
      }
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index 9cb41a90c1,86bc06930c..7c66d84a2f
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@@ -158,70 -156,72 +158,70 @@@ public class RecoveryManager 
      }
    }
  
 -  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> 
walogs)
 -      throws IOException {
 +  public boolean recoverLogs(KeyExtent extent, Collection<LogEntry> walogs) 
throws IOException {
      boolean recoveryNeeded = false;
  
 -    for (Collection<String> logs : walogs) {
 -      for (String walog : logs) {
 -
 -        Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), 
FileType.WAL,
 -            manager.getContext().getVolumeReplacements());
 -        if (switchedWalog != null) {
 -          // replaces the volume used for sorting, but do not change entry in 
metadata table. When
 -          // the tablet loads it will change the metadata table entry. If
 -          // the tablet has the same replacement config, then it will find 
the sorted log.
 -          log.info("Volume replaced {} -> {}", walog, switchedWalog);
 -          walog = switchedWalog.toString();
 -        }
 +    for (LogEntry entry : walogs) {
-       String walog = entry.getFilePath();
++      String walog = entry.getPath();
 +
 +      Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), 
FileType.WAL,
 +          manager.getContext().getVolumeReplacements());
 +      if (switchedWalog != null) {
 +        // replaces the volume used for sorting, but do not change entry in 
metadata table. When
 +        // the tablet loads it will change the metadata table entry. If
 +        // the tablet has the same replacement config, then it will find the 
sorted log.
 +        log.info("Volume replaced {} -> {}", walog, switchedWalog);
 +        walog = switchedWalog.toString();
 +      }
  
 -        String[] parts = walog.split("/");
 -        String sortId = parts[parts.length - 1];
 -        String filename = new Path(walog).toString();
 -        String dest = RecoveryPath.getRecoveryPath(new 
Path(filename)).toString();
 +      String[] parts = walog.split("/");
 +      String sortId = parts[parts.length - 1];
 +      String filename = new Path(walog).toString();
 +      String dest = RecoveryPath.getRecoveryPath(new 
Path(filename)).toString();
 +
 +      boolean sortQueued;
 +      synchronized (this) {
 +        sortQueued = sortsQueued.contains(sortId);
 +      }
  
 -        boolean sortQueued;
 +      if (sortQueued
 +          && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + 
"/" + sortId)
 +              == null) {
          synchronized (this) {
 -          sortQueued = sortsQueued.contains(sortId);
 +          sortsQueued.remove(sortId);
          }
 +      }
  
 -        if (sortQueued
 -            && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY 
+ "/" + sortId)
 -                == null) {
 -          synchronized (this) {
 -            sortsQueued.remove(sortId);
 -          }
 +      if (exists(SortedLogState.getFinishedMarkerPath(dest))) {
 +        synchronized (this) {
 +          closeTasksQueued.remove(sortId);
 +          recoveryDelay.remove(sortId);
 +          sortsQueued.remove(sortId);
          }
 +        continue;
 +      }
  
 -        if (exists(SortedLogState.getFinishedMarkerPath(dest))) {
 -          synchronized (this) {
 -            closeTasksQueued.remove(sortId);
 -            recoveryDelay.remove(sortId);
 -            sortsQueued.remove(sortId);
 +      recoveryNeeded = true;
 +      synchronized (this) {
 +        if (!closeTasksQueued.contains(sortId) && 
!sortsQueued.contains(sortId)) {
 +          AccumuloConfiguration aconf = manager.getConfiguration();
 +          LogCloser closer = Property.createInstanceFromPropertyName(aconf,
 +              Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, 
new HadoopLogCloser());
 +          Long delay = recoveryDelay.get(sortId);
 +          if (delay == null) {
 +            delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY);
 +          } else {
 +            delay = Math.min(2 * delay, 1000 * 60 * 5L);
            }
 -          continue;
 -        }
  
 -        recoveryNeeded = true;
 -        synchronized (this) {
 -          if (!closeTasksQueued.contains(sortId) && 
!sortsQueued.contains(sortId)) {
 -            AccumuloConfiguration aconf = manager.getConfiguration();
 -            LogCloser closer = Property.createInstanceFromPropertyName(aconf,
 -                Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, 
new HadoopLogCloser());
 -            Long delay = recoveryDelay.get(sortId);
 -            if (delay == null) {
 -              delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY);
 -            } else {
 -              delay = Math.min(2 * delay, 1000 * 60 * 5L);
 -            }
 -
 -            log.info("Starting recovery of {} (in : {}s), tablet {} holds a 
reference", filename,
 -                (delay / 1000), extent);
 -
 -            ScheduledFuture<?> future = executor.schedule(
 -                new LogSortTask(closer, filename, dest, sortId), delay, 
TimeUnit.MILLISECONDS);
 -            ThreadPools.watchNonCriticalScheduledTask(future);
 -            closeTasksQueued.add(sortId);
 -            recoveryDelay.put(sortId, delay);
 -          }
 +          log.info("Starting recovery of {} (in : {}s), tablet {} holds a 
reference", filename,
 +              (delay / 1000), extent);
 +
 +          ScheduledFuture<?> future = executor.schedule(
 +              new LogSortTask(closer, filename, dest, sortId), delay, 
TimeUnit.MILLISECONDS);
 +          ThreadPools.watchNonCriticalScheduledTask(future);
 +          closeTasksQueued.add(sortId);
 +          recoveryDelay.put(sortId, delay);
          }
        }
      }
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index a1619d1697,eb283b6b46..b8cc36b6c9
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@@ -82,10 -88,15 +82,11 @@@ import org.apache.accumulo.core.summary
  import org.apache.accumulo.core.summary.SummaryCollection;
  import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal;
  import org.apache.accumulo.core.tablet.thrift.TabletManagementClientService;
 -import org.apache.accumulo.core.tabletingest.thrift.DataFileInfo;
  import org.apache.accumulo.core.tabletingest.thrift.TDurability;
  import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService;
+ import org.apache.accumulo.core.tabletserver.log.LogEntry;
  import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
  import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 -import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 -import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
 -import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
  import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService;
  import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
  import org.apache.accumulo.core.trace.TraceUtil;
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 842d881ecd,9d6043ef3e..1f2870cc62
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@@ -1294,72 -2091,19 +1293,72 @@@ public class Tablet extends TabletBase 
     * Update tablet file data from flush. Returns a StoredTabletFile if there 
are data entries.
     */
    public Optional<StoredTabletFile> updateTabletDataFile(long 
maxCommittedTime,
-       ReferencedTabletFile newDatafile, DataFileValue dfv, Set<String> 
unusedWalLogs, long flushId,
-       MinorCompactionReason mincReason) {
+       ReferencedTabletFile newDatafile, DataFileValue dfv, Set<LogEntry> 
unusedWalLogs,
 -      long flushId) {
 -    synchronized (timeLock) {
 -      if (maxCommittedTime > persistedTime) {
 -        persistedTime = maxCommittedTime;
 -      }
++      long flushId, MinorCompactionReason mincReason) {
 +
 +    Preconditions.checkState(refreshLock.isHeldByCurrentThread());
 +
 +    // Read these once in case of buggy race conditions will get consistent 
logging. If all other
 +    // code is locking properly these should not change during this method.
 +    var lastTabletMetadata = getMetadata();
 +    var expectedTime = lastTabletMetadata.getTime();
 +
 +    // Expect time to only move forward from what was recently seen in 
metadata table.
 +    Preconditions.checkArgument(maxCommittedTime >= expectedTime.getTime());
  
 -      return 
ManagerMetadataUtil.updateTabletDataFile(getTabletServer().getContext(), extent,
 -          newDatafile, dfv, tabletTime.getMetadataTime(persistedTime),
 -          tabletServer.getTabletSession(), tabletServer.getLock(), 
unusedWalLogs, lastLocation,
 -          flushId);
 +    // The tablet time is used to determine if the write succeeded, in order 
to do this the tablet
 +    // time needs to be different from what is currently stored in the 
metadata table.
 +    while (maxCommittedTime == expectedTime.getTime()) {
 +      var nextTime = tabletTime.getAndUpdateTime();
 +      Preconditions.checkState(nextTime >= maxCommittedTime);
 +      if (nextTime > maxCommittedTime) {
 +        maxCommittedTime++;
 +      }
      }
  
 +    try (var tabletsMutator = 
getContext().getAmple().conditionallyMutateTablets()) {
 +
 +      var expectedLocation = mincReason == MinorCompactionReason.RECOVERY
 +          ? Location.future(tabletServer.getTabletSession())
 +          : Location.current(tabletServer.getTabletSession());
 +
 +      var tablet = 
tabletsMutator.mutateTablet(extent).requireLocation(expectedLocation)
 +          .requireSame(lastTabletMetadata, ColumnType.TIME);
 +
 +      Optional<StoredTabletFile> newFile = Optional.empty();
 +
 +      // if entries are present, write to path to metadata table
 +      if (dfv.getNumEntries() > 0) {
 +        tablet.putFile(newDatafile, dfv);
 +        newFile = Optional.of(newDatafile.insert());
 +      }
 +
 +      var newTime = tabletTime.getMetadataTime(maxCommittedTime);
 +      tablet.putTime(newTime);
 +
 +      tablet.putFlushId(flushId);
 +
 +      unusedWalLogs.forEach(tablet::deleteWal);
 +
 +      tablet.putZooLock(getContext().getZooKeeperRoot(), 
tabletServer.getLock());
 +
 +      // When trying to determine if write was successful, check if the time 
was updated. Can not
 +      // check if the new file exists because of two reasons. First, it could 
be compacted away
 +      // between the write and check. Second, some flushes do not produce a 
file.
 +      tablet.submit(tabletMetadata -> 
tabletMetadata.getTime().equals(newTime));
 +
 +      var result = tabletsMutator.process().get(extent);
 +      if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
 +
 +        log.error("Metadata for failed tablet file update : {}", 
result.readMetadata());
 +
 +        // Include the things that could have caused the write to fail.
 +        throw new IllegalStateException("Unable to write minor compaction.  " 
+ extent + " "
 +            + expectedLocation + " " + expectedTime);
 +      }
 +
 +      return newFile;
 +    }
    }
  
    @Override
@@@ -1416,106 -2192,12 +1415,106 @@@
      return timer.getTabletStats();
    }
  
 -  private static String createTabletDirectoryName(ServerContext context, Text 
endRow) {
 -    if (endRow == null) {
 -      return ServerColumnFamily.DEFAULT_TABLET_DIR_NAME;
 -    } else {
 -      UniqueNameAllocator namer = context.getUniqueNameAllocator();
 -      return Constants.GENERATED_TABLET_DIRECTORY_PREFIX + 
namer.getNextName();
 +  public boolean isOnDemand() {
 +    // TODO a change in the hosting goal could refresh online tablets
 +    return getMetadata().getHostingGoal() == TabletHostingGoal.ONDEMAND;
 +  }
 +
 +  // The purpose of this lock is to prevent race conditions between 
concurrent refresh RPC calls and
 +  // between minor compactions and refresh calls.
 +  private final ReentrantLock refreshLock = new ReentrantLock();
 +
 +  void bringMinorCompactionOnline(ReferencedTabletFile tmpDatafile,
 +      ReferencedTabletFile newDatafile, DataFileValue dfv, CommitSession 
commitSession,
 +      long flushId, MinorCompactionReason mincReason) {
 +    Optional<StoredTabletFile> newFile;
 +    // rename before putting in metadata table, so files in metadata table 
should
 +    // always exist
 +    boolean attemptedRename = false;
 +    VolumeManager vm = getTabletServer().getContext().getVolumeManager();
 +    do {
 +      try {
 +        if (dfv.getNumEntries() == 0) {
 +          log.debug("No data entries so delete temporary file {}", 
tmpDatafile);
 +          vm.deleteRecursively(tmpDatafile.getPath());
 +        } else {
 +          if (!attemptedRename && vm.exists(newDatafile.getPath())) {
 +            log.warn("Target data file already exist {}", newDatafile);
 +            throw new RuntimeException("File unexpectedly exists " + 
newDatafile.getPath());
 +          }
 +          // the following checks for spurious rename failures that succeeded 
but gave an IoE
 +          if (attemptedRename && vm.exists(newDatafile.getPath())
 +              && !vm.exists(tmpDatafile.getPath())) {
 +            // seems like previous rename succeeded, so break
 +            break;
 +          }
 +          attemptedRename = true;
 +          ScanfileManager.rename(vm, tmpDatafile.getPath(), 
newDatafile.getPath());
 +        }
 +        break;
 +      } catch (IOException ioe) {
 +        log.warn("Tablet " + getExtent() + " failed to rename " + newDatafile
 +            + " after MinC, will retry in 60 secs...", ioe);
 +        sleepUninterruptibly(1, TimeUnit.MINUTES);
 +      }
 +    } while (true);
 +
 +    // The refresh lock must be held for the metadata write that adds the new 
file to the tablet.
 +    // This prevents a concurrent refresh operation from pulling in the new 
tablet file before the
 +    // in memory map reference related to the file is deactivated. Scans 
should use one of the in
 +    // memory map or the new file, never both.
 +    Preconditions.checkState(!getLogLock().isHeldByCurrentThread());
 +    refreshLock.lock();
 +    try {
 +      // Can not hold tablet lock while acquiring the log lock. The following 
check is there to
 +      // prevent deadlock.
 +      getLogLock().lock();
 +      // do not place any code here between lock and try
 +      try {
 +        // The following call pairs with tablet.finishClearingUnusedLogs() 
later in this block. If
 +        // moving where the following method is called, examine it and 
finishClearingUnusedLogs()
 +        // before moving.
-         Set<String> unusedWalLogs = beginClearingUnusedLogs();
++        Set<LogEntry> unusedWalLogs = beginClearingUnusedLogs();
 +        // the order of writing to metadata and walog is important in the 
face of machine/process
 +        // failures need to write to metadata before writing to walog, when 
things are done in the
 +        // reverse order data could be lost... the minor compaction start 
event should be written
 +        // before the following metadata write is made
 +
 +        newFile = updateTabletDataFile(commitSession.getMaxCommittedTime(), 
newDatafile, dfv,
 +            unusedWalLogs, flushId, mincReason);
 +
 +        finishClearingUnusedLogs();
 +      } finally {
 +        getLogLock().unlock();
 +      }
 +
 +      // Without the refresh lock, if a refresh happened here it could make 
the new file written to
 +      // the metadata table above available for scans while the in memory map 
from which the file
 +      // was produced is still available for scans
 +
 +      do {
 +        try {
 +          // the purpose of making this update use the new commit session, 
instead of the old one
 +          // passed in, is because the new one will reference the logs used 
by current memory...
 +          
getTabletServer().minorCompactionFinished(getTabletMemory().getCommitSession(),
 +              commitSession.getWALogSeq() + 2);
 +          break;
 +        } catch (IOException e) {
 +          log.error("Failed to write to write-ahead log " + e.getMessage() + 
" will retry", e);
 +          sleepUninterruptibly(1, TimeUnit.SECONDS);
 +        }
 +      } while (true);
 +
 +      refreshMetadata(RefreshPurpose.MINC_COMPLETION);
 +    } finally {
 +      refreshLock.unlock();
 +    }
 +    TabletLogger.flushed(getExtent(), newFile);
 +
 +    long splitSize = 
getTableConfiguration().getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
 +    if (dfv.getSize() > splitSize) {
 +      log.debug(String.format("Minor Compaction wrote out file larger than 
split threshold."
 +          + " split threshold = %,d  file size = %,d", splitSize, 
dfv.getSize()));
      }
    }
  
diff --cc 
shell/src/test/java/org/apache/accumulo/shell/commands/ListTabletsCommandTest.java
index 14856bb9e7,583819b96b..0730c80245
--- 
a/shell/src/test/java/org/apache/accumulo/shell/commands/ListTabletsCommandTest.java
+++ 
b/shell/src/test/java/org/apache/accumulo/shell/commands/ListTabletsCommandTest.java
@@@ -128,50 -113,6 +128,50 @@@ public class ListTabletsCommandTest 
    public void mockTest() throws Exception {
      ListTabletsCommand cmd = new TestListTabletsCommand();
  
 +    TableId tableId = TableId.of("123");
 +
 +    TServerInstance ser1 = new 
TServerInstance(HostAndPort.fromParts("server1", 8555), "s001");
 +    TServerInstance ser2 = new 
TServerInstance(HostAndPort.fromParts("server2", 2354), "s002");
 +
 +    StoredTabletFile sf11 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-dir1/sf11.rf")).insert();
 +    DataFileValue dfv11 = new DataFileValue(5643, 89);
 +
 +    StoredTabletFile sf12 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-dir1/sf12.rf")).insert();
 +    DataFileValue dfv12 = new DataFileValue(379963, 1027);
 +
 +    StoredTabletFile sf21 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-dir2/sf21.rf")).insert();
 +    DataFileValue dfv21 = new DataFileValue(5323, 142);
 +
 +    StoredTabletFile sf31 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-dir3/sf31.rf")).insert();
 +    DataFileValue dfv31 = new DataFileValue(95832L, 231);
 +
 +    KeyExtent extent = new KeyExtent(tableId, new Text("d"), null);
 +
-     LogEntry le1 = new LogEntry("localhost:8020/" + UUID.randomUUID());
-     LogEntry le2 = new LogEntry("localhost:8020/" + UUID.randomUUID());
++    LogEntry le1 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID());
++    LogEntry le2 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID());
 +
 +    TabletMetadata tm1 = 
TabletMetadata.builder(extent).putHostingGoal(TabletHostingGoal.ONDEMAND)
 +        .putLocation(TabletMetadata.Location.current(ser1)).putFile(sf11, 
dfv11)
 +        .putFile(sf12, dfv12).putWal(le1).putDirName("t-dir1").build();
 +
 +    extent = new KeyExtent(tableId, new Text("k"), new Text("e"));
 +    TabletMetadata tm2 = 
TabletMetadata.builder(extent).putHostingGoal(TabletHostingGoal.ALWAYS)
 +        .putLocation(TabletMetadata.Location.current(ser2)).putFile(sf21, 
dfv21)
 +        .putDirName("t-dir2").build(LOGS);
 +
 +    extent = new KeyExtent(tableId, null, new Text("l"));
 +    TabletMetadata tm3 = 
TabletMetadata.builder(extent).putHostingGoal(TabletHostingGoal.NEVER)
 +        .putFile(sf31, 
dfv31).putWal(le1).putWal(le2).putDirName("t-dir3").build(LOCATION);
 +
 +    TabletInformationImpl[] tabletInformation = new TabletInformationImpl[3];
 +    tabletInformation[0] = new TabletInformationImpl(tm1, "HOSTED");
 +    tabletInformation[1] = new TabletInformationImpl(tm2, "HOSTED");
 +    tabletInformation[2] = new TabletInformationImpl(tm3, "UNASSIGNED");
 +
      AccumuloClient client = EasyMock.createMock(AccumuloClient.class);
      ClientContext context = EasyMock.createMock(ClientContext.class);
      TableOperations tableOps = EasyMock.createMock(TableOperations.class);
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index 0633b934f8,0000000000..ebc86f97c8
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@@ -1,967 -1,0 +1,967 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static com.google.common.collect.MoreCollectors.onlyElement;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 +import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
 +import static org.apache.accumulo.core.util.LazySingletons.GSON;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertFalse;
 +import static org.junit.jupiter.api.Assertions.assertNotEquals;
 +import static org.junit.jupiter.api.Assertions.assertNull;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.UUID;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.function.Consumer;
 +import java.util.function.Supplier;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 +import org.apache.accumulo.core.client.admin.TimeType;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.fate.FateTxId;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import 
org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.MetadataTime;
 +import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadataBuilder;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationId;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl;
 +import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.junit.jupiter.api.BeforeEach;
 +import org.junit.jupiter.api.Test;
 +
 +import com.google.common.collect.Sets;
 +
 +public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
 +
 +  // ELASTICITY_TODO ensure that all conditional updates are tested
 +
 +  private TableId tid;
 +  private KeyExtent e1;
 +  private KeyExtent e2;
 +  private KeyExtent e3;
 +  private KeyExtent e4;
 +
 +  @BeforeEach
 +  public void setupTable() throws Exception {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      String tableName = getUniqueNames(1)[0];
 +
 +      SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"), new 
Text("f"), new Text("j")));
 +      c.tableOperations().create(tableName,
 +          new NewTableConfiguration().withSplits(splits).createOffline());
 +
 +      c.securityOperations().grantTablePermission("root", MetadataTable.NAME,
 +          TablePermission.WRITE);
 +
 +      tid = TableId.of(c.tableOperations().tableIdMap().get(tableName));
 +
 +      e1 = new KeyExtent(tid, new Text("c"), null);
 +      e2 = new KeyExtent(tid, new Text("f"), new Text("c"));
 +      e3 = new KeyExtent(tid, new Text("j"), new Text("f"));
 +      e4 = new KeyExtent(tid, null, new Text("j"));
 +    }
 +  }
 +
 +  @Test
 +  public void testLocations() throws Exception {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      var ts1 = new TServerInstance("localhost:9997", 5000L);
 +      var ts2 = new TServerInstance("localhost:9997", 6000L);
 +
 +      var context = cluster.getServerContext();
 +
 +      assertNull(context.getAmple().readTablet(e1).getLocation());
 +
 +      var ctmi = new ConditionalTabletsMutatorImpl(context);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
 +          .putLocation(Location.future(ts1)).submit(tm -> false);
 +      var results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +      assertEquals(Location.future(ts1), 
context.getAmple().readTablet(e1).getLocation());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
 +          .putLocation(Location.future(ts2)).submit(tm -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +
 +      assertEquals(Location.future(ts1), 
context.getAmple().readTablet(e1).getLocation());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
 +          
.putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
 +          .submit(tm -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +      assertEquals(Location.current(ts1), 
context.getAmple().readTablet(e1).getLocation());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
 +          
.putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
 +          .submit(tm -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +
 +      assertEquals(Location.current(ts1), 
context.getAmple().readTablet(e1).getLocation());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts2))
 +          
.putLocation(Location.current(ts2)).deleteLocation(Location.future(ts2))
 +          .submit(tm -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +
 +      assertEquals(Location.current(ts1), 
context.getAmple().readTablet(e1).getLocation());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1))
 +          .deleteLocation(Location.current(ts1)).submit(tm -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +      assertNull(context.getAmple().readTablet(e1).getLocation());
 +    }
 +  }
 +
 +  @Test
 +  public void testFiles() throws Exception {
 +
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      var ts1 = new TServerInstance("localhost:9997", 5000L);
 +      var ts2 = new TServerInstance("localhost:9997", 6000L);
 +
 +      var context = cluster.getServerContext();
 +
 +      var stf1 = StoredTabletFile
 +          .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
 +      var stf2 = StoredTabletFile
 +          .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
 +      var stf3 = StoredTabletFile
 +          .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
 +      var stf4 = StoredTabletFile
 +          .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf"));
 +      var dfv = new DataFileValue(100, 100);
 +
 +      System.out.println(context.getAmple().readTablet(e1).getLocation());
 +
 +      // simulate a compaction where the tablet location is not set
 +      var ctmi = new ConditionalTabletsMutatorImpl(context);
 +
 +      var tm1 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
 +          .build();
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES)
 +          .putFile(stf4, new DataFileValue(0, 0)).submit(tm -> false);
 +      var results = ctmi.process();
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +
 +      assertEquals(Set.of(), context.getAmple().readTablet(e1).getFiles());
 +
 +      var tm2 = 
TabletMetadata.builder(e1).putLocation(Location.current(ts1)).build();
 +      // simulate minor compacts where the tablet location is not set
 +      for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
 +        ctmi = new ConditionalTabletsMutatorImpl(context);
 +        ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, 
LOCATION)
 +            .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
 +        results = ctmi.process();
 +        assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +      }
 +
 +      assertEquals(Set.of(), context.getAmple().readTablet(e1).getFiles());
 +
 +      // set the location
 +      var tm3 = TabletMetadata.builder(e1).build(LOCATION);
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm3, 
LOCATION)
 +          .putLocation(Location.current(ts1)).submit(tm -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +      var tm4 = 
TabletMetadata.builder(e1).putLocation(Location.current(ts2)).build();
 +      // simulate minor compacts where the tablet location is wrong
 +      for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
 +        ctmi = new ConditionalTabletsMutatorImpl(context);
 +        ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm4, 
LOCATION)
 +            .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
 +        results = ctmi.process();
 +        assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +      }
 +
 +      assertEquals(Set.of(), context.getAmple().readTablet(e1).getFiles());
 +
 +      // simulate minor compacts where the tablet location is set
 +      for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
 +        ctmi = new ConditionalTabletsMutatorImpl(context);
 +        ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, 
LOCATION)
 +            .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
 +        results = ctmi.process();
 +        assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +      }
 +
 +      assertEquals(Set.of(stf1, stf2, stf3), 
context.getAmple().readTablet(e1).getFiles());
 +
 +      // simulate a compaction and test a subset and superset of files
 +      for (var tabletMeta : List.of(
 +          TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).build(),
 +          TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
 +              .putFile(stf4, dfv).build())) {
 +        ctmi = new ConditionalTabletsMutatorImpl(context);
 +        
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta, FILES)
 +            .putFile(stf4, new DataFileValue(0, 
0)).deleteFile(stf1).deleteFile(stf2)
 +            .deleteFile(stf3).submit(tm -> false);
 +        results = ctmi.process();
 +        assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +        assertEquals(Set.of(stf1, stf2, stf3), 
context.getAmple().readTablet(e1).getFiles());
 +      }
 +
 +      // simulate a compaction
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      var tm5 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
 +          .build();
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES)
 +          .putFile(stf4, new DataFileValue(0, 
0)).deleteFile(stf1).deleteFile(stf2).deleteFile(stf3)
 +          .submit(tm -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +      assertEquals(Set.of(stf4), 
context.getAmple().readTablet(e1).getFiles());
 +
 +      // simulate a bulk import
 +      var stf5 = StoredTabletFile
 +          .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/b-0000009/I0000074.rf"));
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      var tm6 = TabletMetadata.builder(e1).build(LOADED);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED)
 +          .putFile(stf5, new DataFileValue(0, 
0)).putBulkFile(stf5.getTabletFile(), 9L)
 +          .putFile(stf5, new DataFileValue(0, 0)).submit(tm -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +      assertEquals(Set.of(stf4, stf5), 
context.getAmple().readTablet(e1).getFiles());
 +
 +      // simulate a compaction
 +      var stf6 = StoredTabletFile
 +          .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/A0000075.rf"));
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      var tm7 = TabletMetadata.builder(e1).putFile(stf4, dfv).putFile(stf5, 
dfv).build();
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm7, FILES)
 +          .putFile(stf6, new DataFileValue(0, 
0)).deleteFile(stf4).deleteFile(stf5)
 +          .submit(tm -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +      assertEquals(Set.of(stf6), 
context.getAmple().readTablet(e1).getFiles());
 +
 +      // simulate trying to re bulk import file after a compaction
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED)
 +          .putFile(stf5, new DataFileValue(0, 
0)).putBulkFile(stf5.getTabletFile(), 9L)
 +          .submit(tm -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +
 +      assertEquals(Set.of(stf6), 
context.getAmple().readTablet(e1).getFiles());
 +    }
 +  }
 +
 +  @Test
 +  public void testWALs() {
 +    var context = cluster.getServerContext();
 +
 +    // Test adding a WAL to a tablet and verifying its presence
 +    String walFilePath =
-         java.nio.file.Path.of("tserver:8080", 
UUID.randomUUID().toString()).toString();
-     LogEntry originalLogEntry = new LogEntry(walFilePath);
++        java.nio.file.Path.of("tserver+8080", 
UUID.randomUUID().toString()).toString();
++    LogEntry originalLogEntry = LogEntry.fromPath(walFilePath);
 +    ConditionalTabletsMutatorImpl ctmi = new 
ConditionalTabletsMutatorImpl(context);
 +    // create a tablet metadata with no write ahead logs
 +    var tmEmptySet = TabletMetadata.builder(e1).build(LOGS);
 +    // tablet should not have any logs to start with so requireSame with the 
empty logs should pass
 +    ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmEmptySet, 
LOGS)
 +        .putWal(originalLogEntry).submit(tm -> false);
 +    var results = ctmi.process();
 +    assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +    Set<LogEntry> expectedLogs = new HashSet<>();
 +    expectedLogs.add(originalLogEntry);
 +    assertEquals(expectedLogs, new 
HashSet<>(context.getAmple().readTablet(e1).getLogs()),
 +        "The original LogEntry should be present.");
 +
 +    // Test adding another WAL and verifying the update
 +    String walFilePath2 =
-         java.nio.file.Path.of("tserver:8080", 
UUID.randomUUID().toString()).toString();
-     LogEntry newLogEntry = new LogEntry(walFilePath2);
++        java.nio.file.Path.of("tserver+8080", 
UUID.randomUUID().toString()).toString();
++    LogEntry newLogEntry = LogEntry.fromPath(walFilePath2);
 +    ctmi = new ConditionalTabletsMutatorImpl(context);
 +    
ctmi.mutateTablet(e1).requireAbsentOperation().putWal(newLogEntry).submit(tm -> 
false);
 +    results = ctmi.process();
 +    assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +    // Verify that both the original and new WALs are present
 +    expectedLogs.add(newLogEntry);
 +    HashSet<LogEntry> actualLogs = new 
HashSet<>(context.getAmple().readTablet(e1).getLogs());
 +    assertEquals(expectedLogs, actualLogs, "Both original and new LogEntry 
should be present.");
 +
 +    String walFilePath3 =
-         java.nio.file.Path.of("tserver:8080", 
UUID.randomUUID().toString()).toString();
-     LogEntry otherLogEntry = new LogEntry(walFilePath3);
++        java.nio.file.Path.of("tserver+8080", 
UUID.randomUUID().toString()).toString();
++    LogEntry otherLogEntry = LogEntry.fromPath(walFilePath3);
 +
 +    // create a powerset to ensure all possible subsets fail when using 
requireSame except the
 +    // expected current state
 +    Set<LogEntry> allLogs = Set.of(originalLogEntry, newLogEntry, 
otherLogEntry);
 +    Set<Set<LogEntry>> allSubsets = Sets.powerSet(allLogs);
 +
 +    for (Set<LogEntry> subset : allSubsets) {
 +      // Skip the subset that matches the current state of the tablet
 +      if (subset.equals(expectedLogs)) {
 +        continue;
 +      }
 +
 +      final TabletMetadataBuilder builder = TabletMetadata.builder(e1);
 +      subset.forEach(builder::putWal);
 +      TabletMetadata tmSubset = builder.build(LOGS);
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmSubset, 
LOGS)
 +          .deleteWal(originalLogEntry).submit(t -> false);
 +      results = ctmi.process();
 +
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +
 +      // ensure the operation did not go through
 +      actualLogs = new HashSet<>(context.getAmple().readTablet(e1).getLogs());
 +      assertEquals(expectedLogs, actualLogs, "Both original and new LogEntry 
should be present.");
 +    }
 +
 +    // Test that requiring the current WALs gets accepted when making an 
update (deleting a WAL in
 +    // this example)
 +    TabletMetadata tm2 =
 +        
TabletMetadata.builder(e1).putWal(originalLogEntry).putWal(newLogEntry).build(LOGS);
 +    ctmi = new ConditionalTabletsMutatorImpl(context);
 +    ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, LOGS)
 +        .deleteWal(originalLogEntry).submit(tm -> false);
 +    results = ctmi.process();
 +    assertEquals(Status.ACCEPTED, results.get(e1).getStatus(),
 +        "Requiring the current WALs should result in acceptance when making 
an update.");
 +
 +    // Verify that the update went through as expected
 +    assertEquals(List.of(newLogEntry), 
context.getAmple().readTablet(e1).getLogs(),
 +        "Only the new LogEntry should remain after deleting the original.");
 +  }
 +
 +  @Test
 +  public void testSelectedFiles() throws Exception {
 +    var context = cluster.getServerContext();
 +
 +    var stf1 = StoredTabletFile
 +        .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
 +    var stf2 = StoredTabletFile
 +        .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
 +    var stf3 = StoredTabletFile
 +        .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
 +    var stf4 = StoredTabletFile
 +        .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf"));
 +    var dfv = new DataFileValue(100, 100);
 +
 +    System.out.println(context.getAmple().readTablet(e1).getLocation());
 +
 +    // simulate a compaction where the tablet location is not set
 +    var ctmi = new ConditionalTabletsMutatorImpl(context);
 +    var tm1 = TabletMetadata.builder(e1).build(FILES, SELECTED);
 +    ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, 
FILES).putFile(stf1, dfv)
 +        .putFile(stf2, dfv).putFile(stf3, dfv).submit(tm -> false);
 +    var results = ctmi.process();
 +    assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +    assertEquals(Set.of(stf1, stf2, stf3), 
context.getAmple().readTablet(e1).getFiles());
 +
 +    ctmi = new ConditionalTabletsMutatorImpl(context);
 +    ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES, 
SELECTED)
 +        .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
2L))
 +        .submit(tm -> false);
 +    results = ctmi.process();
 +    assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +
 +    assertEquals(Set.of(stf1, stf2, stf3), 
context.getAmple().readTablet(e1).getFiles());
 +    assertNull(context.getAmple().readTablet(e1).getSelectedFiles());
 +
 +    var tm2 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
 +        .build(SELECTED);
 +    ctmi = new ConditionalTabletsMutatorImpl(context);
 +    ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, FILES, 
SELECTED)
 +        .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
2L))
 +        .submit(tm -> false);
 +    results = ctmi.process();
 +    assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +    assertEquals(Set.of(stf1, stf2, stf3), 
context.getAmple().readTablet(e1).getFiles());
 +    assertEquals(Set.of(stf1, stf2, stf3),
 +        context.getAmple().readTablet(e1).getSelectedFiles().getFiles());
 +
 +    // a list of selected files objects that are not the same as the current 
tablet and expected to
 +    // fail
 +    var expectedToFail = new ArrayList<SelectedFiles>();
 +
 +    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2), true, 2L));
 +    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3, stf4), 
true, 2L));
 +    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), false, 
2L));
 +    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 3L));
 +
 +    for (var selectedFiles : expectedToFail) {
 +      var tm3 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
 +          .putSelectedFiles(selectedFiles).build();
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm3, FILES, 
SELECTED)
 +          .deleteSelectedFiles().submit(tm -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +
 +      assertEquals(Set.of(stf1, stf2, stf3), 
context.getAmple().readTablet(e1).getFiles());
 +      assertEquals(Set.of(stf1, stf2, stf3),
 +          context.getAmple().readTablet(e1).getSelectedFiles().getFiles());
 +    }
 +
 +    var tm5 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
 +        .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
2L)).build();
 +    ctmi = new ConditionalTabletsMutatorImpl(context);
 +    ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES, 
SELECTED)
 +        .deleteSelectedFiles().submit(tm -> false);
 +    results = ctmi.process();
 +    assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +    assertEquals(Set.of(stf1, stf2, stf3), 
context.getAmple().readTablet(e1).getFiles());
 +    assertNull(context.getAmple().readTablet(e1).getSelectedFiles());
 +  }
 +
 +  /**
 +   * Verifies that if selected files have been manually changed in the 
metadata, the files will be
 +   * re-ordered before being read
 +   */
 +  @Test
 +  public void testSelectedFilesReordering() throws Exception {
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      var context = cluster.getServerContext();
 +
 +      String pathPrefix = 
"hdfs://localhost:8020/accumulo/tables/2a/default_tablet/";
 +      StoredTabletFile stf1 = StoredTabletFile.of(new Path(pathPrefix + 
"F0000070.rf"));
 +      StoredTabletFile stf2 = StoredTabletFile.of(new Path(pathPrefix + 
"F0000071.rf"));
 +      StoredTabletFile stf3 = StoredTabletFile.of(new Path(pathPrefix + 
"F0000072.rf"));
 +
 +      final Set<StoredTabletFile> storedTabletFiles = Set.of(stf1, stf2, 
stf3);
 +      final boolean initiallySelectedAll = true;
 +      final long fateTxId = 2L;
 +      final SelectedFiles selectedFiles =
 +          new SelectedFiles(storedTabletFiles, initiallySelectedAll, 
fateTxId);
 +
 +      ConditionalTabletsMutatorImpl ctmi = new 
ConditionalTabletsMutatorImpl(context);
 +
 +      // write the SelectedFiles to the keyextent
 +      
ctmi.mutateTablet(e1).requireAbsentOperation().putSelectedFiles(selectedFiles)
 +          .submit(tm -> false);
 +
 +      // verify we can read the selected files
 +      Status mutationStatus = ctmi.process().get(e1).getStatus();
 +      assertEquals(Status.ACCEPTED, mutationStatus, "Failed to put selected 
files to tablet");
 +      assertEquals(selectedFiles, 
context.getAmple().readTablet(e1).getSelectedFiles(),
 +          "Selected files should match those that were written");
 +
 +      final Text row = e1.toMetaRow();
 +      final Text selectedColumnFamily = SELECTED_COLUMN.getColumnFamily();
 +      final Text selectedColumnQualifier = 
SELECTED_COLUMN.getColumnQualifier();
 +
 +      Supplier<String> selectedMetadataValue = () -> {
 +        try (Scanner scanner = client.createScanner(MetadataTable.NAME)) {
 +          scanner.fetchColumn(selectedColumnFamily, selectedColumnQualifier);
 +          scanner.setRange(new Range(row));
 +
 +          return scanner.stream().map(Map.Entry::getValue).map(Value::get)
 +              .map(entry -> new String(entry, UTF_8)).collect(onlyElement());
 +        } catch (Exception e) {
 +          throw new RuntimeException(e);
 +        }
 +      };
 +
 +      String actualMetadataValue = selectedMetadataValue.get();
 +      final String expectedMetadata = selectedFiles.getMetadataValue();
 +      assertEquals(expectedMetadata, actualMetadataValue,
 +          "Value should be equal to metadata of SelectedFiles object that was 
written");
 +
 +      // get the string value of the paths
 +      List<String> filesPathList = 
storedTabletFiles.stream().map(StoredTabletFile::toString)
 +          .sorted().collect(Collectors.toList());
 +
 +      // verify we have the format of the json correct
 +      String newJson = createSelectedFilesJson(fateTxId, 
initiallySelectedAll, filesPathList);
 +      assertEquals(actualMetadataValue, newJson,
 +          "Test json should be identical to actual metadata at this point");
 +
 +      // reverse the order of the files and create a new json
 +      Collections.reverse(filesPathList);
 +      newJson = createSelectedFilesJson(fateTxId, initiallySelectedAll, 
filesPathList);
 +      assertNotEquals(actualMetadataValue, newJson,
 +          "Test json should have reverse file order of actual metadata");
 +
 +      // write the json with reverse file order
 +      try (BatchWriter bw = client.createBatchWriter(MetadataTable.NAME)) {
 +        Mutation mutation = new Mutation(row);
 +        mutation.put(selectedColumnFamily, selectedColumnQualifier,
 +            new Value(newJson.getBytes(UTF_8)));
 +        bw.addMutation(mutation);
 +      }
 +
 +      // verify the metadata has been changed
 +      actualMetadataValue = selectedMetadataValue.get();
 +      assertEquals(newJson, actualMetadataValue,
 +          "Value should be equal to the new json we manually wrote above");
 +
 +      TabletMetadata tm1 =
 +          
TabletMetadata.builder(e1).putSelectedFiles(selectedFiles).build(SELECTED);
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      StoredTabletFile stf4 = StoredTabletFile.of(new Path(pathPrefix + 
"F0000073.rf"));
 +      // submit a mutation with the condition that the selected files match 
what was originally
 +      // written
 +      DataFileValue dfv = new DataFileValue(100, 100);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, 
SELECTED).putFile(stf4, dfv)
 +          .submit(tm -> false);
 +
 +      mutationStatus = ctmi.process().get(e1).getStatus();
 +
 +      // with a SortedFilesIterator attached to the Condition for SELECTED, 
the metadata should be
 +      // re-ordered and once again match what was originally written meaning 
the conditional
 +      // mutation should have been accepted and the file added should be 
present
 +      assertEquals(Status.ACCEPTED, mutationStatus);
 +      assertEquals(Set.of(stf4), context.getAmple().readTablet(e1).getFiles(),
 +          "Expected to see the file that was added by the mutation");
 +    }
 +  }
 +
 +  /**
 +   * Creates a json suitable to create a SelectedFiles object from. The given 
parameters will be
 +   * inserted into the returned json String. Example of returned json String:
 +   *
 +   * <pre>
 +   * {
 +   *   "txid": "FATE[123456]",
 +   *   "selAll": true,
 +   *   "files": ["/path/to/file1.rf", "/path/to/file2.rf"]
 +   * }
 +   * </pre>
 +   */
 +  public static String createSelectedFilesJson(Long txid, boolean selAll,
 +      Collection<String> paths) {
 +    String filesJsonArray = GSON.get().toJson(paths);
 +    String formattedTxid = 
FateTxId.formatTid(Long.parseLong(Long.toString(txid), 16));
 +    return ("{'txid':'" + formattedTxid + "','selAll':" + selAll + 
",'files':" + filesJsonArray
 +        + "}").replace('\'', '\"');
 +  }
 +
 +  @Test
 +  public void testMultipleExtents() {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      var ts1 = new TServerInstance("localhost:9997", 5000L);
 +      var ts2 = new TServerInstance("localhost:9997", 6000L);
 +
 +      var context = cluster.getServerContext();
 +
 +      var ctmi = new ConditionalTabletsMutatorImpl(context);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
 +          .putLocation(Location.future(ts1)).submit(tm -> false);
 +      ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation()
 +          .putLocation(Location.future(ts2)).submit(tm -> false);
 +      var results = ctmi.process();
 +
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
 +
 +      assertEquals(Location.future(ts1), 
context.getAmple().readTablet(e1).getLocation());
 +      assertEquals(Location.future(ts2), 
context.getAmple().readTablet(e2).getLocation());
 +      assertNull(context.getAmple().readTablet(e3).getLocation());
 +      assertNull(context.getAmple().readTablet(e4).getLocation());
 +
 +      assertEquals(Set.of(e1, e2), results.keySet());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
 +          .putLocation(Location.future(ts2)).submit(tm -> false);
 +      ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation()
 +          .putLocation(Location.future(ts1)).submit(tm -> false);
 +      ctmi.mutateTablet(e3).requireAbsentOperation().requireAbsentLocation()
 +          .putLocation(Location.future(ts1)).submit(tm -> false);
 +      ctmi.mutateTablet(e4).requireAbsentOperation().requireAbsentLocation()
 +          .putLocation(Location.future(ts2)).submit(tm -> false);
 +      results = ctmi.process();
 +
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +      assertEquals(Status.REJECTED, results.get(e2).getStatus());
 +      assertEquals(Status.ACCEPTED, results.get(e3).getStatus());
 +      assertEquals(Status.ACCEPTED, results.get(e4).getStatus());
 +
 +      assertEquals(Location.future(ts1), 
context.getAmple().readTablet(e1).getLocation());
 +      assertEquals(Location.future(ts2), 
context.getAmple().readTablet(e2).getLocation());
 +      assertEquals(Location.future(ts1), 
context.getAmple().readTablet(e3).getLocation());
 +      assertEquals(Location.future(ts2), 
context.getAmple().readTablet(e4).getLocation());
 +
 +      assertEquals(Set.of(e1, e2, e3, e4), results.keySet());
 +
 +    }
 +  }
 +
 +  @Test
 +  public void testOperations() {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      var context = cluster.getServerContext();
 +
 +      var opid1 = TabletOperationId.from("SPLITTING:FATE[1234]");
 +      var opid2 = TabletOperationId.from("MERGING:FATE[5678]");
 +
 +      var ctmi = new ConditionalTabletsMutatorImpl(context);
 +      
ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit(tm -> 
false);
 +      
ctmi.mutateTablet(e2).requireAbsentOperation().putOperation(opid2).submit(tm -> 
false);
 +      
ctmi.mutateTablet(e3).requireOperation(opid1).deleteOperation().submit(tm -> 
false);
 +      var results = ctmi.process();
 +
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
 +      assertEquals(Status.REJECTED, results.get(e3).getStatus());
 +      assertEquals(TabletOperationType.SPLITTING,
 +          context.getAmple().readTablet(e1).getOperationId().getType());
 +      assertEquals(opid1, context.getAmple().readTablet(e1).getOperationId());
 +      assertEquals(TabletOperationType.MERGING,
 +          context.getAmple().readTablet(e2).getOperationId().getType());
 +      assertEquals(opid2, context.getAmple().readTablet(e2).getOperationId());
 +      assertNull(context.getAmple().readTablet(e3).getOperationId());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      
ctmi.mutateTablet(e1).requireOperation(opid2).deleteOperation().submit(tm -> 
false);
 +      
ctmi.mutateTablet(e2).requireOperation(opid1).deleteOperation().submit(tm -> 
false);
 +      results = ctmi.process();
 +
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +      assertEquals(Status.REJECTED, results.get(e2).getStatus());
 +      assertEquals(TabletOperationType.SPLITTING,
 +          context.getAmple().readTablet(e1).getOperationId().getType());
 +      assertEquals(TabletOperationType.MERGING,
 +          context.getAmple().readTablet(e2).getOperationId().getType());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      
ctmi.mutateTablet(e1).requireOperation(opid1).deleteOperation().submit(tm -> 
false);
 +      
ctmi.mutateTablet(e2).requireOperation(opid2).deleteOperation().submit(tm -> 
false);
 +      results = ctmi.process();
 +
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
 +      assertNull(context.getAmple().readTablet(e1).getOperationId());
 +      assertNull(context.getAmple().readTablet(e2).getOperationId());
 +    }
 +  }
 +
 +  @Test
 +  public void testCompacted() {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      var context = cluster.getServerContext();
 +
 +      var ctmi = new ConditionalTabletsMutatorImpl(context);
 +
 +      var tabletMeta1 = TabletMetadata.builder(e1).build(COMPACTED);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
COMPACTED)
 +          .putCompacted(55L).submit(tabletMetadata -> 
tabletMetadata.getCompacted().contains(55L));
 +      var tabletMeta2 = 
TabletMetadata.builder(e2).putCompacted(45L).build(COMPACTED);
 +      ctmi.mutateTablet(e2).requireAbsentOperation().requireSame(tabletMeta2, 
COMPACTED)
 +          .putCompacted(56L).submit(tabletMetadata -> 
tabletMetadata.getCompacted().contains(56L));
 +
 +      var results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +      assertEquals(Status.REJECTED, results.get(e2).getStatus());
 +
 +      tabletMeta1 = context.getAmple().readTablet(e1);
 +      assertEquals(Set.of(55L), tabletMeta1.getCompacted());
 +      assertEquals(Set.of(), 
context.getAmple().readTablet(e2).getCompacted());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
COMPACTED)
 +          .putCompacted(65L).putCompacted(75L).submit(tabletMetadata -> 
false);
 +
 +      results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +
 +      tabletMeta1 = context.getAmple().readTablet(e1);
 +      assertEquals(Set.of(55L, 65L, 75L), tabletMeta1.getCompacted());
 +
 +      // test require same with a superset
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      tabletMeta1 = 
TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).putCompacted(75L)
 +          .putCompacted(45L).build(COMPACTED);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
COMPACTED)
 +          .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L)
 +          .submit(tabletMetadata -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +      assertEquals(Set.of(55L, 65L, 75L), 
context.getAmple().readTablet(e1).getCompacted());
 +
 +      // test require same with a subset
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      tabletMeta1 = 
TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).build(COMPACTED);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
COMPACTED)
 +          .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L)
 +          .submit(tabletMetadata -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +      assertEquals(Set.of(55L, 65L, 75L), 
context.getAmple().readTablet(e1).getCompacted());
 +
 +      // now use the exact set the tablet has
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      tabletMeta1 = 
TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).putCompacted(75L)
 +          .build(COMPACTED);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
COMPACTED)
 +          .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L)
 +          .submit(tabletMetadata -> false);
 +      results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +      assertEquals(Set.of(), 
context.getAmple().readTablet(e1).getCompacted());
 +    }
 +  }
 +
 +  @Test
 +  public void testRootTabletUpdate() {
 +    var context = cluster.getServerContext();
 +
 +    var rootMeta = context.getAmple().readTablet(RootTable.EXTENT);
 +    var loc = rootMeta.getLocation();
 +
 +    assertEquals(LocationType.CURRENT, loc.getType());
 +    assertNull(rootMeta.getOperationId());
 +
 +    TabletOperationId opid = 
TabletOperationId.from(TabletOperationType.MERGING, 7);
 +
 +    var ctmi = new ConditionalTabletsMutatorImpl(context);
 +    
ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation().requireAbsentLocation()
 +        .putOperation(opid).submit(tm -> false);
 +    var results = ctmi.process();
 +    assertEquals(Status.REJECTED, results.get(RootTable.EXTENT).getStatus());
 +    
assertNull(context.getAmple().readTablet(RootTable.EXTENT).getOperationId());
 +
 +    ctmi = new ConditionalTabletsMutatorImpl(context);
 +    ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation()
 +        
.requireLocation(Location.future(loc.getServerInstance())).putOperation(opid)
 +        .submit(tm -> false);
 +    results = ctmi.process();
 +    assertEquals(Status.REJECTED, results.get(RootTable.EXTENT).getStatus());
 +    
assertNull(context.getAmple().readTablet(RootTable.EXTENT).getOperationId());
 +
 +    ctmi = new ConditionalTabletsMutatorImpl(context);
 +    ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation()
 +        
.requireLocation(Location.current(loc.getServerInstance())).putOperation(opid)
 +        .submit(tm -> false);
 +    results = ctmi.process();
 +    assertEquals(Status.ACCEPTED, results.get(RootTable.EXTENT).getStatus());
 +    assertEquals(opid.canonical(),
 +        
context.getAmple().readTablet(RootTable.EXTENT).getOperationId().canonical());
 +  }
 +
 +  @Test
 +  public void testTime() {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      var context = cluster.getServerContext();
 +
 +      for (var time : List.of(new MetadataTime(100, TimeType.LOGICAL),
 +          new MetadataTime(100, TimeType.MILLIS), new MetadataTime(0, 
TimeType.LOGICAL))) {
 +        var ctmi = new ConditionalTabletsMutatorImpl(context);
 +        var tabletMeta1 = TabletMetadata.builder(e1).putTime(time).build();
 +        
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME)
 +            .putTime(new MetadataTime(101, 
TimeType.LOGICAL)).submit(tabletMetadata -> false);
 +        var results = ctmi.process();
 +        assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +        assertEquals(new MetadataTime(0, TimeType.MILLIS),
 +            context.getAmple().readTablet(e1).getTime());
 +      }
 +
 +      for (int i = 0; i < 10; i++) {
 +        var ctmi = new ConditionalTabletsMutatorImpl(context);
 +        var tabletMeta1 =
 +            TabletMetadata.builder(e1).putTime(new MetadataTime(i, 
TimeType.MILLIS)).build();
 +        
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME)
 +            .putTime(new MetadataTime(i + 1, 
TimeType.MILLIS)).submit(tabletMetadata -> false);
 +        var results = ctmi.process();
 +        assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +        assertEquals(new MetadataTime(i + 1, TimeType.MILLIS),
 +            context.getAmple().readTablet(e1).getTime());
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testFlushId() {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      var context = cluster.getServerContext();
 +
 +      assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty());
 +
 +      var ctmi = new ConditionalTabletsMutatorImpl(context);
 +
 +      var tabletMeta1 = TabletMetadata.builder(e1).putFlushId(42L).build();
 +      assertTrue(tabletMeta1.getFlushId().isPresent());
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
FLUSH_ID)
 +          .putFlushId(43L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 43L);
 +      var results = ctmi.process();
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +      assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      var tabletMeta2 = TabletMetadata.builder(e1).build(FLUSH_ID);
 +      assertFalse(tabletMeta2.getFlushId().isPresent());
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta2, 
FLUSH_ID)
 +          .putFlushId(43L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 43L);
 +      results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +      assertEquals(43L, 
context.getAmple().readTablet(e1).getFlushId().getAsLong());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      var tabletMeta3 = TabletMetadata.builder(e1).putFlushId(43L).build();
 +      assertTrue(tabletMeta1.getFlushId().isPresent());
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3, 
FLUSH_ID)
 +          .putFlushId(44L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 44L);
 +      results = ctmi.process();
 +      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
 +      assertEquals(44L, 
context.getAmple().readTablet(e1).getFlushId().getAsLong());
 +
 +      ctmi = new ConditionalTabletsMutatorImpl(context);
 +      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3, 
FLUSH_ID)
 +          .putFlushId(45L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 45L);
 +      results = ctmi.process();
 +      assertEquals(Status.REJECTED, results.get(e1).getStatus());
 +      assertEquals(44L, 
context.getAmple().readTablet(e1).getFlushId().getAsLong());
 +    }
 +  }
 +
 +  @Test
 +  public void testAsyncMutator() throws Exception {
 +    var table = getUniqueNames(2)[1];
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      // The AsyncConditionalTabletsMutatorImpl processes batches of 
conditional mutations. Run
 +      // tests where more than the batch size is processed an ensure this 
handled correctly.
 +
 +      TreeSet<Text> splits =
 +          IntStream.range(1, (int) 
(AsyncConditionalTabletsMutatorImpl.BATCH_SIZE * 2.5))
 +              .mapToObj(i -> new Text(String.format("%06d", i)))
 +              .collect(Collectors.toCollection(TreeSet::new));
 +
 +      assertTrue(splits.size() > 
AsyncConditionalTabletsMutatorImpl.BATCH_SIZE);
 +
 +      c.tableOperations().create(table, new 
NewTableConfiguration().withSplits(splits));
 +      var tableId = TableId.of(c.tableOperations().tableIdMap().get(table));
 +
 +      var ample = cluster.getServerContext().getAmple();
 +
 +      AtomicLong accepted = new AtomicLong(0);
 +      AtomicLong total = new AtomicLong(0);
 +      Consumer<Ample.ConditionalResult> resultsConsumer = result -> {
 +        if (result.getStatus() == Status.ACCEPTED) {
 +          accepted.incrementAndGet();
 +        }
 +        total.incrementAndGet();
 +      };
 +
 +      // run a test where a subset of tablets are modified, all modifications 
should be accepted
 +      var opid1 = TabletOperationId.from(TabletOperationType.MERGING, 50);
 +
 +      int expected = 0;
 +      try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, 
PREV_ROW).build();
 +          var mutator = ample.conditionallyMutateTablets(resultsConsumer)) {
 +        for (var tablet : tablets) {
 +          if (tablet.getEndRow() != null
 +              && Integer.parseInt(tablet.getEndRow().toString()) % 2 == 0) {
 +            
mutator.mutateTablet(tablet.getExtent()).requireAbsentOperation().putOperation(opid1)
 +                .submit(tm -> opid1.equals(tm.getOperationId()));
 +            expected++;
 +          }
 +        }
 +      }
 +
 +      assertTrue(expected > 0);
 +      assertEquals(expected, accepted.get());
 +      assertEquals(total.get(), accepted.get());
 +
 +      // run test where some will be accepted and some will be rejected and 
ensure the counts come
 +      // out as expected.
 +      var opid2 = TabletOperationId.from(TabletOperationType.MERGING, 51);
 +
 +      accepted.set(0);
 +      total.set(0);
 +
 +      try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, 
PREV_ROW).build();
 +          var mutator = ample.conditionallyMutateTablets(resultsConsumer)) {
 +        for (var tablet : tablets) {
 +          
mutator.mutateTablet(tablet.getExtent()).requireAbsentOperation().putOperation(opid2)
 +              .submit(tm -> opid2.equals(tm.getOperationId()));
 +        }
 +      }
 +
 +      var numTablets = splits.size() + 1;
 +      assertEquals(numTablets - expected, accepted.get());
 +      assertEquals(numTablets, total.get());
 +    }
 +  }
 +
 +}
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index ae8b24ad9c,0000000000..5ca097da33
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@@ -1,484 -1,0 +1,484 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +
 +import java.io.IOException;
 +import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.UUID;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchDeleter;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.RowIterator;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 +import org.apache.accumulo.core.client.admin.TabletHostingGoal;
 +import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 +import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
 +import org.apache.accumulo.core.manager.state.tables.TableState;
 +import org.apache.accumulo.core.manager.thrift.ManagerState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationId;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 +import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.server.manager.LiveTServerSet;
 +import org.apache.accumulo.server.manager.state.TabletManagementIterator;
 +import org.apache.accumulo.server.manager.state.TabletManagementParameters;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.junit.jupiter.api.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Sets;
 +
 +/**
 + * Test to ensure that the {@link TabletManagementIterator} properly skips 
over tablet information
 + * in the metadata table when there is no work to be done on the tablet (see 
ACCUMULO-3580)
 + */
 +public class TabletManagementIteratorIT extends AccumuloClusterHarness {
 +  private final static Logger log = 
LoggerFactory.getLogger(TabletManagementIteratorIT.class);
 +
 +  @Override
 +  protected Duration defaultTimeout() {
 +    return Duration.ofMinutes(3);
 +  }
 +
 +  @Test
 +  public void test() throws AccumuloException, AccumuloSecurityException, 
TableExistsException,
 +      TableNotFoundException, IOException {
 +
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      String[] tables = getUniqueNames(8);
 +      final String t1 = tables[0];
 +      final String t2 = tables[1];
 +      final String t3 = tables[2];
 +      final String t4 = tables[3];
 +      final String metaCopy1 = tables[4];
 +      final String metaCopy2 = tables[5];
 +      final String metaCopy3 = tables[6];
 +      final String metaCopy4 = tables[7];
 +
 +      // create some metadata
 +      createTable(client, t1, true);
 +      createTable(client, t2, false);
 +      createTable(client, t3, true);
 +      createTable(client, t4, true);
 +
 +      // Scan table t3 which will cause it's tablets
 +      // to be hosted. Then, remove the location.
 +      Scanner s = client.createScanner(t3);
 +      s.setRange(new Range());
 +      @SuppressWarnings("unused")
 +      var unused = Iterables.size(s); // consume all the data
 +
 +      // examine a clone of the metadata table, so we can manipulate it
 +      copyTable(client, MetadataTable.NAME, metaCopy1);
 +
 +      TabletManagementParameters tabletMgmtParams = createParameters(client);
 +      int tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams);
 +      while (tabletsInFlux > 0) {
 +        log.debug("Waiting for {} tablets for {}", tabletsInFlux, metaCopy1);
 +        UtilWaitThread.sleep(500);
 +        copyTable(client, MetadataTable.NAME, metaCopy1);
 +        tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams);
 +      }
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "No tables should need attention");
 +
 +      // The metadata table stabilized and metaCopy1 contains a copy suitable 
for testing. Before
 +      // metaCopy1 is modified, copy it for subsequent test.
 +      copyTable(client, metaCopy1, metaCopy2);
 +      copyTable(client, metaCopy1, metaCopy3);
 +      copyTable(client, metaCopy1, metaCopy4);
 +
 +      // t1 is unassigned, setting to always will generate a change to host 
tablets
 +      setTabletHostingGoal(client, metaCopy1, t1, 
TabletHostingGoal.ALWAYS.name());
 +      // t3 is hosted, setting to never will generate a change to unhost 
tablets
 +      setTabletHostingGoal(client, metaCopy1, t3, 
TabletHostingGoal.NEVER.name());
 +      tabletMgmtParams = createParameters(client);
 +      assertEquals(4, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "Should have four tablets with hosting goal changes");
 +
 +      // test the assigned case (no location)
 +      removeLocation(client, metaCopy1, t3);
 +      assertEquals(2, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "Should have two tablets without a loc");
 +
 +      // Test setting the operation id on one of the tablets in table t1. 
Table t1 has two tablets
 +      // w/o a location. Only one should need attention because of the 
operation id.
 +      setOperationId(client, metaCopy1, t1, new Text("some split"), 
TabletOperationType.SPLITTING);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "Should have tablets needing attention because of operation id");
 +
 +      // test the cases where the assignment is to a dead tserver
 +      reassignLocation(client, metaCopy2, t3);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, 
tabletMgmtParams),
 +          "Only 1 of 2 tablets in table t1 should be returned");
 +
 +      // Remove location and set merge operation id on both tablets
 +      // These tablets should not need attention as they have no WALs
 +      setTabletHostingGoal(client, metaCopy4, t4, 
TabletHostingGoal.ALWAYS.name());
 +      removeLocation(client, metaCopy4, t4);
 +      assertEquals(2, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Tablets have no location and a hosting goal of always, so they 
should need attention");
 +
 +      // Test MERGING and SPLITTING do not need attention with no location or 
wals
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.MERGING);
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have no tablets needing attention for merge as they have no 
location");
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.SPLITTING);
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have no tablets needing attention for merge as they have no 
location");
 +
 +      // Create a log entry for one of the tablets, this tablet will now need 
attention
 +      // for both MERGING and SPLITTING
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.MERGING);
 +      createLogEntry(client, metaCopy4, t4);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have a tablet needing attention because of wals");
 +      // Switch op to SPLITTING which should also need attention like MERGING
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.SPLITTING);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have a tablet needing attention because of wals");
 +
 +      // Switch op to delete, no tablets should need attention even with WALs
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.DELETING);
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have no tablets needing attention for delete");
 +
 +      // test the bad tablet location state case (inconsistent metadata)
 +      tabletMgmtParams = createParameters(client);
 +      addDuplicateLocation(client, metaCopy3, t3);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy3, 
tabletMgmtParams),
 +          "Should have 1 tablet that needs a metadata repair");
 +
 +      // test the volume replacements case. Need to insert some files into
 +      // the metadata for t4, then run the TabletManagementIterator with
 +      // volume replacements
 +      addFiles(client, metaCopy4, t4);
 +      List<Pair<Path,Path>> replacements = new ArrayList<>();
 +      replacements.add(new Pair<Path,Path>(new 
Path("file:/vol1/accumulo/inst_id"),
 +          new Path("file:/vol2/accumulo/inst_id")));
 +      tabletMgmtParams = createParameters(client, replacements);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have one tablet that needs a volume replacement");
 +
 +      // clean up
 +      dropTables(client, t1, t2, t3, t4, metaCopy1, metaCopy2, metaCopy3, 
metaCopy4);
 +    }
 +  }
 +
 +  private void setTabletHostingGoal(AccumuloClient client, String table, 
String tableNameToModify,
 +      String state) throws TableNotFoundException, MutationsRejectedException 
{
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +
 +    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) 
{
 +      scanner.setRange(new KeyExtent(tableIdToModify, null, 
null).toMetaRange());
 +      for (Entry<Key,Value> entry : scanner) {
 +        Mutation m = new Mutation(entry.getKey().getRow());
 +        m.put(HostingColumnFamily.GOAL_COLUMN.getColumnFamily(),
 +            HostingColumnFamily.GOAL_COLUMN.getColumnQualifier(), 
entry.getKey().getTimestamp() + 1,
 +            new Value(state));
 +        try (BatchWriter bw = client.createBatchWriter(table)) {
 +          bw.addMutation(m);
 +        }
 +      }
 +    }
 +  }
 +
 +  private void addDuplicateLocation(AccumuloClient client, String table, 
String tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, 
null).toMetaRow());
 +    m.put(CurrentLocationColumnFamily.NAME, new Text("1234567"), new 
Value("fake:9005"));
 +    try (BatchWriter bw = client.createBatchWriter(table)) {
 +      bw.addMutation(m);
 +    }
 +  }
 +
 +  private void addFiles(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, 
null).toMetaRow());
 +    m.put(DataFileColumnFamily.NAME,
 +        new Text(StoredTabletFile
 +            
.serialize("file:/vol1/accumulo/inst_id/tables/2a/default_tablet/F0000072.rf")),
 +        new Value(new DataFileValue(0, 0, 0).encode()));
 +    try (BatchWriter bw = client.createBatchWriter(table)) {
 +      bw.addMutation(m);
 +    }
 +    try {
 +      client.createScanner(table).iterator()
 +          .forEachRemaining(e -> System.out.println(e.getKey() + "-> " + 
e.getValue()));
 +    } catch (TableNotFoundException e) {
 +      // TODO Auto-generated catch block
 +      e.printStackTrace();
 +    } catch (AccumuloSecurityException e) {
 +      // TODO Auto-generated catch block
 +      e.printStackTrace();
 +    } catch (AccumuloException e) {
 +      // TODO Auto-generated catch block
 +      e.printStackTrace();
 +    }
 +  }
 +
 +  private void reassignLocation(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) 
{
 +      scanner.setRange(new KeyExtent(tableIdToModify, null, 
null).toMetaRange());
 +      scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
 +      Entry<Key,Value> entry = scanner.iterator().next();
 +      Mutation m = new Mutation(entry.getKey().getRow());
 +      m.putDelete(entry.getKey().getColumnFamily(), 
entry.getKey().getColumnQualifier(),
 +          entry.getKey().getTimestamp());
 +      m.put(entry.getKey().getColumnFamily(), new Text("1234567"),
 +          entry.getKey().getTimestamp() + 1, new Value("fake:9005"));
 +      try (BatchWriter bw = client.createBatchWriter(table)) {
 +        bw.addMutation(m);
 +      }
 +    }
 +  }
 +
 +  // Sets an operation type on all tablets up to the end row
 +  private void setOperationId(AccumuloClient client, String table, String 
tableNameToModify,
 +      Text end, TabletOperationType opType) throws TableNotFoundException {
 +    var opid = TabletOperationId.from(opType, 42L);
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    try (TabletsMetadata tabletsMetadata =
 +        getServerContext().getAmple().readTablets().forTable(tableIdToModify)
 +            .overlapping(null, end != null ? 
TabletsSection.encodeRow(tableIdToModify, end) : null)
 +            .fetch(ColumnType.PREV_ROW).build()) {
 +      for (TabletMetadata tabletMetadata : tabletsMetadata) {
 +        Mutation m = new Mutation(tabletMetadata.getExtent().toMetaRow());
 +        MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m,
 +            new Value(opid.canonical()));
 +        try (BatchWriter bw = client.createBatchWriter(table)) {
 +          bw.addMutation(m);
 +        } catch (MutationsRejectedException e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +  }
 +
 +  private void removeLocation(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    BatchDeleter deleter = client.createBatchDeleter(table, 
Authorizations.EMPTY, 1);
 +    deleter
 +        .setRanges(Collections.singleton(new KeyExtent(tableIdToModify, null, 
null).toMetaRange()));
 +    deleter.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
 +    deleter.delete();
 +    deleter.close();
 +  }
 +
 +  private int findTabletsNeedingAttention(AccumuloClient client, String table,
 +      TabletManagementParameters tabletMgmtParams) throws 
TableNotFoundException, IOException {
 +    int results = 0;
 +    List<KeyExtent> resultList = new ArrayList<>();
 +    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) 
{
 +      TabletManagementIterator.configureScanner(scanner, tabletMgmtParams);
 +      scanner.updateScanIteratorOption("tabletChange", "debug", "1");
 +      for (Entry<Key,Value> e : scanner) {
 +        if (e != null) {
 +          TabletManagement mti = TabletManagementIterator.decode(e);
 +          results++;
 +          log.debug("Found tablets that changed state: {}", 
mti.getTabletMetadata().getExtent());
 +          log.debug("metadata: {}", mti.getTabletMetadata());
 +          resultList.add(mti.getTabletMetadata().getExtent());
 +        }
 +      }
 +    }
 +    log.debug("Tablets in flux: {}", resultList);
 +    return results;
 +  }
 +
 +  private void createTable(AccumuloClient client, String t, boolean online)
 +      throws AccumuloSecurityException, AccumuloException, 
TableNotFoundException,
 +      TableExistsException {
 +    SortedSet<Text> partitionKeys = new TreeSet<>();
 +    partitionKeys.add(new Text("some split"));
 +    NewTableConfiguration ntc = new 
NewTableConfiguration().withSplits(partitionKeys);
 +    client.tableOperations().create(t, ntc);
 +    client.tableOperations().online(t);
 +    if (!online) {
 +      client.tableOperations().offline(t, true);
 +    }
 +  }
 +
 +  /**
 +   * Create a copy of the source table by first gathering all the rows of the 
source in a list of
 +   * mutations. Then create the copy of the table and apply the mutations to 
the copy.
 +   */
 +  private void copyTable(AccumuloClient client, String source, String copy)
 +      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
 +      TableExistsException {
 +    try {
 +      dropTables(client, copy);
 +    } catch (TableNotFoundException ex) {
 +      // ignored
 +    }
 +
 +    log.info("Gathering rows to copy {} ", source);
 +    List<Mutation> mutations = new ArrayList<>();
 +
 +    try (Scanner scanner = client.createScanner(source, 
Authorizations.EMPTY)) {
 +      RowIterator rows = new RowIterator(new IsolatedScanner(scanner));
 +
 +      while (rows.hasNext()) {
 +        Iterator<Entry<Key,Value>> row = rows.next();
 +        Mutation m = null;
 +
 +        while (row.hasNext()) {
 +          Entry<Key,Value> entry = row.next();
 +          Key k = entry.getKey();
 +          if (m == null) {
 +            m = new Mutation(k.getRow());
 +          }
 +
 +          m.put(k.getColumnFamily(), k.getColumnQualifier(), 
k.getColumnVisibilityParsed(),
 +              k.getTimestamp(), entry.getValue());
 +        }
 +
 +        mutations.add(m);
 +      }
 +    }
 +
 +    // metadata should be stable with only 6 rows (2 for each table)
 +    log.debug("Gathered {} rows to create copy {}", mutations.size(), copy);
 +    assertEquals(8, mutations.size(), "Metadata should have 8 rows (2 for 
each table)");
 +    client.tableOperations().create(copy);
 +
 +    try (BatchWriter writer = client.createBatchWriter(copy)) {
 +      for (Mutation m : mutations) {
 +        writer.addMutation(m);
 +      }
 +    }
 +
 +    log.info("Finished creating copy " + copy);
 +  }
 +
 +  private void dropTables(AccumuloClient client, String... tables)
 +      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
 +    for (String t : tables) {
 +      client.tableOperations().delete(t);
 +    }
 +  }
 +
 +  // Creates a log entry on the "some split" extent, this could be modified 
easily to support
 +  // other extents
 +  private void createLogEntry(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws MutationsRejectedException, TableNotFoundException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    KeyExtent extent = new KeyExtent(tableIdToModify, new Text("some split"), 
null);
 +    Mutation m = new Mutation(extent.toMetaRow());
 +    String fileName = "file:/accumulo/wal/localhost+9997/" + 
UUID.randomUUID().toString();
-     LogEntry logEntry = new LogEntry(fileName);
++    LogEntry logEntry = LogEntry.fromPath(fileName);
 +    logEntry.addToMutation(m);
 +    try (BatchWriter bw = client.createBatchWriter(table)) {
 +      bw.addMutation(m);
 +    }
 +  }
 +
 +  private static TabletManagementParameters createParameters(AccumuloClient 
client) {
 +    return createParameters(client, List.of());
 +  }
 +
 +  private static TabletManagementParameters createParameters(AccumuloClient 
client,
 +      List<Pair<Path,Path>> replacements) {
 +    var context = (ClientContext) client;
 +    Set<TableId> onlineTables = 
Sets.filter(context.getTableIdToNameMap().keySet(),
 +        tableId -> context.getTableState(tableId) == TableState.ONLINE);
 +
 +    HashSet<TServerInstance> tservers = new HashSet<>();
 +    for (String tserver : context.instanceOperations().getTabletServers()) {
 +      try {
 +        var zPath = 
ServiceLock.path(ZooUtil.getRoot(context.instanceOperations().getInstanceId())
 +            + Constants.ZTSERVERS + "/" + tserver);
 +        long sessionId = ServiceLock.getSessionId(context.getZooCache(), 
zPath);
 +        tservers.add(new TServerInstance(tserver, sessionId));
 +      } catch (Exception e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    return new TabletManagementParameters(ManagerState.NORMAL,
 +        Map.of(
 +            Ample.DataLevel.ROOT, true, Ample.DataLevel.USER, true, 
Ample.DataLevel.METADATA, true),
 +        onlineTables,
 +        new LiveTServerSet.LiveTServersSnapshot(tservers,
 +            Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)),
 +        Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, 
replacements);
 +  }
 +}


Reply via email to