This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 4fef06761c2da98299414809f691ed85d937b4ed
Merge: e6799c9cf7 1aa81d81b0
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Wed Dec 13 17:58:00 2023 +0000

    Merge branch 'main' into elasticity

 .../core/client/MutationsRejectedException.java    | 11 ++--
 .../core/metadata/schema/TabletMutatorBase.java    |  2 +-
 .../accumulo/core/tabletserver/log/LogEntry.java   | 61 ++++++++++------------
 .../core/metadata/schema/TabletMetadataTest.java   | 11 ++--
 .../core/tabletserver/log/LogEntryTest.java        | 11 ++--
 .../server/constraints/MetadataConstraints.java    |  2 +-
 .../org/apache/accumulo/server/fs/VolumeUtil.java  | 10 ++--
 .../accumulo/server/util/ListVolumesUsed.java      |  2 +-
 .../constraints/MetadataConstraintsTest.java       | 56 +++++---------------
 .../apache/accumulo/server/fs/VolumeUtilTest.java  |  6 +--
 .../server/manager/state/TabletManagementTest.java |  7 +--
 .../accumulo/gc/GarbageCollectWriteAheadLogs.java  |  2 +-
 .../accumulo/manager/recovery/RecoveryManager.java |  2 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  2 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  2 +-
 .../test/MissingWalHeaderCompletesRecoveryIT.java  |  6 +--
 .../functional/TabletManagementIteratorIT.java     |  4 +-
 17 files changed, 77 insertions(+), 120 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
index ea13271f6f,4f7f9c230c..2792397ab7
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
@@@ -179,10 -173,10 +179,10 @@@ public abstract class TabletMutatorBase
    }
  
    @Override
 -  public Ample.TabletMutator putWal(LogEntry logEntry) {
 +  public T putWal(LogEntry logEntry) {
      Preconditions.checkState(updatesEnabled, "Cannot make updates after 
calling mutate.");
-     mutation.put(LogColumnFamily.NAME, logEntry.getColumnQualifier(), 
logEntry.getValue());
+     logEntry.addToMutation(mutation);
 -    return this;
 +    return getThis();
    }
  
    @Override
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
index 190f7e0e3b,617bf20e86..9fad119948
--- 
a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@@ -416,17 -374,9 +416,17 @@@ public class MetadataConstraints implem
        case 7:
          return "Lock not held in zookeeper by writer";
        case 8:
-         return "Bulk load transaction no longer running";
+         return "Bulk load mutation contains either inconsistent files or 
multiple fateTX ids";
        case 9:
 +        return "Malformed operation id";
 +      case 10:
 +        return "Malformed hosting goal";
 +      case 11:
 +        return "Malformed file selection value";
 +      case 12:
          return "Invalid data file metadata format";
 +      case 13:
 +        return "Invalid compacted column";
      }
      return null;
    }
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index 2bb653cb70,29f470ebe6..5c3ac18584
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@@ -130,59 -128,37 +130,59 @@@ public class VolumeUtil 
      }
    }
  
 -  /**
 -   * This method does two things. First, it switches any volumes a tablet is 
using that are
 -   * configured in instance.volumes.replacements. Second, if a tablet dir is 
no longer configured
 -   * for use it chooses a new tablet directory.
 -   */
 -  public static TabletFiles updateTabletVolumes(ServerContext context, 
ServiceLock zooLock,
 -      KeyExtent extent, TabletFiles tabletFiles) {
 -    List<Pair<Path,Path>> replacements = context.getVolumeReplacements();
 +  public static boolean needsVolumeReplacement(final List<Pair<Path,Path>> 
replacements,
 +      final TabletMetadata tm) {
      if (replacements.isEmpty()) {
 -      return tabletFiles;
 +      return false;
      }
 -    log.trace("Using volume replacements: {}", replacements);
  
 -    List<LogEntry> logsToRemove = new ArrayList<>();
 -    List<LogEntry> logsToAdd = new ArrayList<>();
 +    MutableBoolean needsReplacement = new MutableBoolean(false);
 +
 +    Consumer<LogEntry> consumer = le -> needsReplacement.setTrue();
 +
 +    volumeReplacementEvaluation(replacements, tm, consumer, consumer,
 +        f -> needsReplacement.setTrue(), (f, dfv) -> 
needsReplacement.setTrue());
 +
 +    return needsReplacement.booleanValue();
 +  }
  
 -    List<StoredTabletFile> filesToRemove = new ArrayList<>();
 -    SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd = new 
TreeMap<>();
 +  public static class VolumeReplacements {
 +    public final TabletMetadata tabletMeta;
 +    public final List<LogEntry> logsToRemove = new ArrayList<>();
 +    public final List<LogEntry> logsToAdd = new ArrayList<>();
 +    public final List<StoredTabletFile> filesToRemove = new ArrayList<>();
 +    public final Map<ReferencedTabletFile,DataFileValue> filesToAdd = new 
HashMap<>();
  
 -    TabletFiles ret = new TabletFiles();
 +    public VolumeReplacements(TabletMetadata tabletMeta) {
 +      this.tabletMeta = tabletMeta;
 +    }
 +  }
  
 -    for (LogEntry logEntry : tabletFiles.logEntries) {
 +  public static VolumeReplacements
 +      computeVolumeReplacements(final List<Pair<Path,Path>> replacements, 
final TabletMetadata tm) {
 +    var vr = new VolumeReplacements(tm);
 +    volumeReplacementEvaluation(replacements, tm, vr.logsToRemove::add, 
vr.logsToAdd::add,
 +        vr.filesToRemove::add, vr.filesToAdd::put);
 +    return vr;
 +  }
 +
 +  public static void volumeReplacementEvaluation(final List<Pair<Path,Path>> 
replacements,
 +      final TabletMetadata tm, final Consumer<LogEntry> logsToRemove,
 +      final Consumer<LogEntry> logsToAdd, final Consumer<StoredTabletFile> 
filesToRemove,
 +      final BiConsumer<ReferencedTabletFile,DataFileValue> filesToAdd) {
 +    if (replacements.isEmpty() || (tm.getFilesMap().isEmpty() && 
tm.getLogs().isEmpty())) {
 +      return;
 +    }
 +
 +    log.trace("Using volume replacements: {}", replacements);
 +    for (LogEntry logEntry : tm.getLogs()) {
 +      log.trace("Evaluating walog {} for replacement.", logEntry);
        LogEntry switchedLogEntry = switchVolumes(logEntry, replacements);
        if (switchedLogEntry != null) {
 -        logsToRemove.add(logEntry);
 -        logsToAdd.add(switchedLogEntry);
 -        ret.logEntries.add(switchedLogEntry);
 -        log.debug("Replacing volume {} : {} -> {}", extent, 
logEntry.getLogReference(),
 +        logsToRemove.accept(logEntry);
 +        logsToAdd.accept(switchedLogEntry);
-         log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), 
logEntry.getFilePath(),
-             switchedLogEntry.getFilePath());
++        log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), 
logEntry.getLogReference(),
+             switchedLogEntry.getLogReference());
 -      } else {
 -        ret.logEntries.add(logEntry);
        }
      }
  
diff --cc 
server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java
index f1223ee71b,0000000000..fafa79f696
mode 100644,000000..100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java
@@@ -1,191 -1,0 +1,188 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.manager.state;
 +
 +import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
 +import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
 +import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +
 +import java.util.ArrayList;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +import java.util.UUID;
 +
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.fate.FateTxId;
 +import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
 +import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
- import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.junit.jupiter.api.Test;
 +
 +public class TabletManagementTest {
 +
 +  private SortedMap<Key,Value> toRowMap(Mutation mutation) {
 +    SortedMap<Key,Value> rowMap = new TreeMap<>();
 +    mutation.getUpdates().forEach(cu -> {
 +      Key k = new Key(mutation.getRow(), cu.getColumnFamily(), 
cu.getColumnQualifier(),
 +          cu.getTimestamp());
 +      Value v = new Value(cu.getValue());
 +      rowMap.put(k, v);
 +    });
 +    return rowMap;
 +  }
 +
 +  private SortedMap<Key,Value> createMetadataEntryKV(KeyExtent extent) {
 +
 +    Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent);
 +
 +    DIRECTORY_COLUMN.put(mutation, new Value("t-0001757"));
 +    FLUSH_COLUMN.put(mutation, new Value("6"));
 +    TIME_COLUMN.put(mutation, new Value("M123456789"));
 +
 +    StoredTabletFile bf1 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/bf1")).insert();
 +    StoredTabletFile bf2 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/bf2")).insert();
 +    
mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf1.getMetadata())
 +        .put(FateTxId.formatTid(56));
 +    
mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf2.getMetadata())
 +        .put(FateTxId.formatTid(59));
 +
 +    mutation.at().family(ClonedColumnFamily.NAME).qualifier("").put("OK");
 +
 +    DataFileValue dfv1 = new DataFileValue(555, 23);
 +    StoredTabletFile tf1 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/df1.rf")).insert();
 +    StoredTabletFile tf2 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/df2.rf")).insert();
 +    
mutation.at().family(DataFileColumnFamily.NAME).qualifier(tf1.getMetadata()).put(dfv1.encode());
 +    DataFileValue dfv2 = new DataFileValue(234, 13);
 +    
mutation.at().family(DataFileColumnFamily.NAME).qualifier(tf2.getMetadata()).put(dfv2.encode());
 +
 +    
mutation.at().family(CurrentLocationColumnFamily.NAME).qualifier("s001").put("server1:8555");
 +
 +    
mutation.at().family(LastLocationColumnFamily.NAME).qualifier("s000").put("server2:8555");
 +
 +    LogEntry le1 = new LogEntry("localhost:8020/" + UUID.randomUUID());
-     
mutation.at().family(LogColumnFamily.NAME).qualifier(le1.getColumnQualifier())
-         .put(le1.getValue());
++    le1.addToMutation(mutation);
 +    LogEntry le2 = new LogEntry("localhost:8020/" + UUID.randomUUID());
-     
mutation.at().family(LogColumnFamily.NAME).qualifier(le2.getColumnQualifier())
-         .put(le2.getValue());
++    le2.addToMutation(mutation);
 +
 +    StoredTabletFile sf1 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")).insert();
 +    StoredTabletFile sf2 =
 +        new ReferencedTabletFile(new 
Path("hdfs://nn1/acc/tables/1/t-0001/sf2.rf")).insert();
 +    
mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf1.getMetadata()).put("");
 +    
mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf2.getMetadata()).put("");
 +
 +    return toRowMap(mutation);
 +
 +  }
 +
 +  @Test
 +  public void testEncodeDecodeWithReasons() throws Exception {
 +    KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new 
Text("da"));
 +
 +    final Set<ManagementAction> actions =
 +        Set.of(ManagementAction.NEEDS_LOCATION_UPDATE, 
ManagementAction.NEEDS_SPLITTING);
 +
 +    final SortedMap<Key,Value> entries = createMetadataEntryKV(extent);
 +
 +    TabletManagement.addActions(entries, actions);
 +    Key key = entries.firstKey();
 +    Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()),
 +        new ArrayList<>(entries.values()));
 +
 +    // Remove the REASONS column from the entries map for the comparison check
 +    // below
 +    entries.remove(new Key(key.getRow().toString(), "REASONS", ""));
 +
 +    TabletManagement tmi = new TabletManagement(key, val, true);
 +    assertEquals(entries, tmi.getTabletMetadata().getKeyValues());
 +    assertEquals(actions, tmi.getActions());
 +  }
 +
 +  @Test
 +  public void testEncodeDecodeWithErrors() throws Exception {
 +    KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new 
Text("da"));
 +
 +    final SortedMap<Key,Value> entries = createMetadataEntryKV(extent);
 +
 +    TabletManagement.addError(entries, new UnsupportedOperationException("Not 
supported."));
 +    Key key = entries.firstKey();
 +    Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()),
 +        new ArrayList<>(entries.values()));
 +
 +    // Remove the ERROR column from the entries map for the comparison check
 +    // below
 +    entries.remove(new Key(key.getRow().toString(), "ERROR", ""));
 +
 +    TabletManagement tmi = new TabletManagement(key, val, true);
 +    assertEquals(entries, tmi.getTabletMetadata().getKeyValues());
 +    assertEquals("Not supported.", tmi.getErrorMessage());
 +  }
 +
 +  @Test
 +  public void testBinary() throws Exception {
 +    // test end row with non ascii data
 +    Text endRow = new Text(new byte[] {'m', (byte) 0xff});
 +    KeyExtent extent = new KeyExtent(TableId.of("5"), endRow, new Text("da"));
 +
 +    final Set<ManagementAction> actions =
 +        Set.of(ManagementAction.NEEDS_LOCATION_UPDATE, 
ManagementAction.NEEDS_SPLITTING);
 +
 +    final SortedMap<Key,Value> entries = createMetadataEntryKV(extent);
 +
 +    TabletManagement.addActions(entries, actions);
 +    Key key = entries.firstKey();
 +    Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()),
 +        new ArrayList<>(entries.values()));
 +
 +    assertTrue(entries.keySet().stream().allMatch(k -> 
k.getRow().equals(extent.toMetaRow())));
 +
 +    // Remove the REASONS column from the entries map for the comparison check
 +    // below
 +    entries.remove(new Key(key.getRow(), new Text("REASONS"), new Text("")));
 +
 +    TabletManagement tmi = new TabletManagement(key, val, true);
 +    assertEquals(entries, tmi.getTabletMetadata().getKeyValues());
 +    assertEquals(actions, tmi.getActions());
 +
 +  }
 +}
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 493b64afb4,807ba9f690..3c533c2691
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -294,15 -286,16 +294,15 @@@ public class GarbageCollectWriteAheadLo
        }
        // Tablet is being recovered and has WAL references, remove all the 
WALs for the dead server
        // that made the WALs.
 -      for (Collection<String> wals : state.walogs) {
 -        for (String wal : wals) {
 -          UUID walUUID = path2uuid(new Path(wal));
 -          TServerInstance dead = result.get(walUUID);
 -          // There's a reference to a log file, so skip that server's logs
 -          Set<UUID> idsToIgnore = candidates.remove(dead);
 -          if (idsToIgnore != null) {
 -            result.keySet().removeAll(idsToIgnore);
 -            recoveryLogs.keySet().removeAll(idsToIgnore);
 -          }
 +      for (LogEntry wals : tabletMetadata.getLogs()) {
-         String wal = wals.getFilePath();
++        String wal = wals.getLogReference();
 +        UUID walUUID = path2uuid(new Path(wal));
 +        TServerInstance dead = result.get(walUUID);
 +        // There's a reference to a log file, so skip that server's logs
 +        Set<UUID> idsToIgnore = candidates.remove(dead);
 +        if (idsToIgnore != null) {
 +          result.keySet().removeAll(idsToIgnore);
 +          recoveryLogs.keySet().removeAll(idsToIgnore);
          }
        }
      }
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index 9cb41a90c1,86bc06930c..3c8d7922f2
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@@ -158,70 -156,72 +158,70 @@@ public class RecoveryManager 
      }
    }
  
 -  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> 
walogs)
 -      throws IOException {
 +  public boolean recoverLogs(KeyExtent extent, Collection<LogEntry> walogs) 
throws IOException {
      boolean recoveryNeeded = false;
  
 -    for (Collection<String> logs : walogs) {
 -      for (String walog : logs) {
 -
 -        Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), 
FileType.WAL,
 -            manager.getContext().getVolumeReplacements());
 -        if (switchedWalog != null) {
 -          // replaces the volume used for sorting, but do not change entry in 
metadata table. When
 -          // the tablet loads it will change the metadata table entry. If
 -          // the tablet has the same replacement config, then it will find 
the sorted log.
 -          log.info("Volume replaced {} -> {}", walog, switchedWalog);
 -          walog = switchedWalog.toString();
 -        }
 +    for (LogEntry entry : walogs) {
-       String walog = entry.getFilePath();
++      String walog = entry.getLogReference();
 +
 +      Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), 
FileType.WAL,
 +          manager.getContext().getVolumeReplacements());
 +      if (switchedWalog != null) {
 +        // replaces the volume used for sorting, but do not change entry in 
metadata table. When
 +        // the tablet loads it will change the metadata table entry. If
 +        // the tablet has the same replacement config, then it will find the 
sorted log.
 +        log.info("Volume replaced {} -> {}", walog, switchedWalog);
 +        walog = switchedWalog.toString();
 +      }
  
 -        String[] parts = walog.split("/");
 -        String sortId = parts[parts.length - 1];
 -        String filename = new Path(walog).toString();
 -        String dest = RecoveryPath.getRecoveryPath(new 
Path(filename)).toString();
 +      String[] parts = walog.split("/");
 +      String sortId = parts[parts.length - 1];
 +      String filename = new Path(walog).toString();
 +      String dest = RecoveryPath.getRecoveryPath(new 
Path(filename)).toString();
 +
 +      boolean sortQueued;
 +      synchronized (this) {
 +        sortQueued = sortsQueued.contains(sortId);
 +      }
  
 -        boolean sortQueued;
 +      if (sortQueued
 +          && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + 
"/" + sortId)
 +              == null) {
          synchronized (this) {
 -          sortQueued = sortsQueued.contains(sortId);
 +          sortsQueued.remove(sortId);
          }
 +      }
  
 -        if (sortQueued
 -            && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY 
+ "/" + sortId)
 -                == null) {
 -          synchronized (this) {
 -            sortsQueued.remove(sortId);
 -          }
 +      if (exists(SortedLogState.getFinishedMarkerPath(dest))) {
 +        synchronized (this) {
 +          closeTasksQueued.remove(sortId);
 +          recoveryDelay.remove(sortId);
 +          sortsQueued.remove(sortId);
          }
 +        continue;
 +      }
  
 -        if (exists(SortedLogState.getFinishedMarkerPath(dest))) {
 -          synchronized (this) {
 -            closeTasksQueued.remove(sortId);
 -            recoveryDelay.remove(sortId);
 -            sortsQueued.remove(sortId);
 +      recoveryNeeded = true;
 +      synchronized (this) {
 +        if (!closeTasksQueued.contains(sortId) && 
!sortsQueued.contains(sortId)) {
 +          AccumuloConfiguration aconf = manager.getConfiguration();
 +          LogCloser closer = Property.createInstanceFromPropertyName(aconf,
 +              Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, 
new HadoopLogCloser());
 +          Long delay = recoveryDelay.get(sortId);
 +          if (delay == null) {
 +            delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY);
 +          } else {
 +            delay = Math.min(2 * delay, 1000 * 60 * 5L);
            }
 -          continue;
 -        }
  
 -        recoveryNeeded = true;
 -        synchronized (this) {
 -          if (!closeTasksQueued.contains(sortId) && 
!sortsQueued.contains(sortId)) {
 -            AccumuloConfiguration aconf = manager.getConfiguration();
 -            LogCloser closer = Property.createInstanceFromPropertyName(aconf,
 -                Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, 
new HadoopLogCloser());
 -            Long delay = recoveryDelay.get(sortId);
 -            if (delay == null) {
 -              delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY);
 -            } else {
 -              delay = Math.min(2 * delay, 1000 * 60 * 5L);
 -            }
 -
 -            log.info("Starting recovery of {} (in : {}s), tablet {} holds a 
reference", filename,
 -                (delay / 1000), extent);
 -
 -            ScheduledFuture<?> future = executor.schedule(
 -                new LogSortTask(closer, filename, dest, sortId), delay, 
TimeUnit.MILLISECONDS);
 -            ThreadPools.watchNonCriticalScheduledTask(future);
 -            closeTasksQueued.add(sortId);
 -            recoveryDelay.put(sortId, delay);
 -          }
 +          log.info("Starting recovery of {} (in : {}s), tablet {} holds a 
reference", filename,
 +              (delay / 1000), extent);
 +
 +          ScheduledFuture<?> future = executor.schedule(
 +              new LogSortTask(closer, filename, dest, sortId), delay, 
TimeUnit.MILLISECONDS);
 +          ThreadPools.watchNonCriticalScheduledTask(future);
 +          closeTasksQueued.add(sortId);
 +          recoveryDelay.put(sortId, delay);
          }
        }
      }
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index 7785aa58ca,0000000000..ae8b24ad9c
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@@ -1,486 -1,0 +1,484 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +
 +import java.io.IOException;
 +import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.UUID;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchDeleter;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.RowIterator;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 +import org.apache.accumulo.core.client.admin.TabletHostingGoal;
 +import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 +import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
 +import org.apache.accumulo.core.manager.state.tables.TableState;
 +import org.apache.accumulo.core.manager.thrift.ManagerState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
- import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationId;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 +import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.server.manager.LiveTServerSet;
 +import org.apache.accumulo.server.manager.state.TabletManagementIterator;
 +import org.apache.accumulo.server.manager.state.TabletManagementParameters;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.junit.jupiter.api.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Sets;
 +
 +/**
 + * Test to ensure that the {@link TabletManagementIterator} properly skips 
over tablet information
 + * in the metadata table when there is no work to be done on the tablet (see 
ACCUMULO-3580)
 + */
 +public class TabletManagementIteratorIT extends AccumuloClusterHarness {
 +  private final static Logger log = 
LoggerFactory.getLogger(TabletManagementIteratorIT.class);
 +
 +  @Override
 +  protected Duration defaultTimeout() {
 +    return Duration.ofMinutes(3);
 +  }
 +
 +  @Test
 +  public void test() throws AccumuloException, AccumuloSecurityException, 
TableExistsException,
 +      TableNotFoundException, IOException {
 +
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      String[] tables = getUniqueNames(8);
 +      final String t1 = tables[0];
 +      final String t2 = tables[1];
 +      final String t3 = tables[2];
 +      final String t4 = tables[3];
 +      final String metaCopy1 = tables[4];
 +      final String metaCopy2 = tables[5];
 +      final String metaCopy3 = tables[6];
 +      final String metaCopy4 = tables[7];
 +
 +      // create some metadata
 +      createTable(client, t1, true);
 +      createTable(client, t2, false);
 +      createTable(client, t3, true);
 +      createTable(client, t4, true);
 +
 +      // Scan table t3 which will cause it's tablets
 +      // to be hosted. Then, remove the location.
 +      Scanner s = client.createScanner(t3);
 +      s.setRange(new Range());
 +      @SuppressWarnings("unused")
 +      var unused = Iterables.size(s); // consume all the data
 +
 +      // examine a clone of the metadata table, so we can manipulate it
 +      copyTable(client, MetadataTable.NAME, metaCopy1);
 +
 +      TabletManagementParameters tabletMgmtParams = createParameters(client);
 +      int tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams);
 +      while (tabletsInFlux > 0) {
 +        log.debug("Waiting for {} tablets for {}", tabletsInFlux, metaCopy1);
 +        UtilWaitThread.sleep(500);
 +        copyTable(client, MetadataTable.NAME, metaCopy1);
 +        tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams);
 +      }
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "No tables should need attention");
 +
 +      // The metadata table stabilized and metaCopy1 contains a copy suitable 
for testing. Before
 +      // metaCopy1 is modified, copy it for subsequent test.
 +      copyTable(client, metaCopy1, metaCopy2);
 +      copyTable(client, metaCopy1, metaCopy3);
 +      copyTable(client, metaCopy1, metaCopy4);
 +
 +      // t1 is unassigned, setting to always will generate a change to host 
tablets
 +      setTabletHostingGoal(client, metaCopy1, t1, 
TabletHostingGoal.ALWAYS.name());
 +      // t3 is hosted, setting to never will generate a change to unhost 
tablets
 +      setTabletHostingGoal(client, metaCopy1, t3, 
TabletHostingGoal.NEVER.name());
 +      tabletMgmtParams = createParameters(client);
 +      assertEquals(4, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "Should have four tablets with hosting goal changes");
 +
 +      // test the assigned case (no location)
 +      removeLocation(client, metaCopy1, t3);
 +      assertEquals(2, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "Should have two tablets without a loc");
 +
 +      // Test setting the operation id on one of the tablets in table t1. 
Table t1 has two tablets
 +      // w/o a location. Only one should need attention because of the 
operation id.
 +      setOperationId(client, metaCopy1, t1, new Text("some split"), 
TabletOperationType.SPLITTING);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "Should have tablets needing attention because of operation id");
 +
 +      // test the cases where the assignment is to a dead tserver
 +      reassignLocation(client, metaCopy2, t3);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, 
tabletMgmtParams),
 +          "Only 1 of 2 tablets in table t1 should be returned");
 +
 +      // Remove location and set merge operation id on both tablets
 +      // These tablets should not need attention as they have no WALs
 +      setTabletHostingGoal(client, metaCopy4, t4, 
TabletHostingGoal.ALWAYS.name());
 +      removeLocation(client, metaCopy4, t4);
 +      assertEquals(2, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Tablets have no location and a hosting goal of always, so they 
should need attention");
 +
 +      // Test MERGING and SPLITTING do not need attention with no location or 
wals
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.MERGING);
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have no tablets needing attention for merge as they have no 
location");
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.SPLITTING);
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have no tablets needing attention for merge as they have no 
location");
 +
 +      // Create a log entry for one of the tablets, this tablet will now need 
attention
 +      // for both MERGING and SPLITTING
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.MERGING);
 +      createLogEntry(client, metaCopy4, t4);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have a tablet needing attention because of wals");
 +      // Switch op to SPLITTING which should also need attention like MERGING
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.SPLITTING);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have a tablet needing attention because of wals");
 +
 +      // Switch op to delete, no tablets should need attention even with WALs
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.DELETING);
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have no tablets needing attention for delete");
 +
 +      // test the bad tablet location state case (inconsistent metadata)
 +      tabletMgmtParams = createParameters(client);
 +      addDuplicateLocation(client, metaCopy3, t3);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy3, 
tabletMgmtParams),
 +          "Should have 1 tablet that needs a metadata repair");
 +
 +      // test the volume replacements case. Need to insert some files into
 +      // the metadata for t4, then run the TabletManagementIterator with
 +      // volume replacements
 +      addFiles(client, metaCopy4, t4);
 +      List<Pair<Path,Path>> replacements = new ArrayList<>();
 +      replacements.add(new Pair<Path,Path>(new 
Path("file:/vol1/accumulo/inst_id"),
 +          new Path("file:/vol2/accumulo/inst_id")));
 +      tabletMgmtParams = createParameters(client, replacements);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have one tablet that needs a volume replacement");
 +
 +      // clean up
 +      dropTables(client, t1, t2, t3, t4, metaCopy1, metaCopy2, metaCopy3, 
metaCopy4);
 +    }
 +  }
 +
 +  private void setTabletHostingGoal(AccumuloClient client, String table, 
String tableNameToModify,
 +      String state) throws TableNotFoundException, MutationsRejectedException 
{
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +
 +    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) 
{
 +      scanner.setRange(new KeyExtent(tableIdToModify, null, 
null).toMetaRange());
 +      for (Entry<Key,Value> entry : scanner) {
 +        Mutation m = new Mutation(entry.getKey().getRow());
 +        m.put(HostingColumnFamily.GOAL_COLUMN.getColumnFamily(),
 +            HostingColumnFamily.GOAL_COLUMN.getColumnQualifier(), 
entry.getKey().getTimestamp() + 1,
 +            new Value(state));
 +        try (BatchWriter bw = client.createBatchWriter(table)) {
 +          bw.addMutation(m);
 +        }
 +      }
 +    }
 +  }
 +
 +  private void addDuplicateLocation(AccumuloClient client, String table, 
String tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, 
null).toMetaRow());
 +    m.put(CurrentLocationColumnFamily.NAME, new Text("1234567"), new 
Value("fake:9005"));
 +    try (BatchWriter bw = client.createBatchWriter(table)) {
 +      bw.addMutation(m);
 +    }
 +  }
 +
 +  private void addFiles(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, 
null).toMetaRow());
 +    m.put(DataFileColumnFamily.NAME,
 +        new Text(StoredTabletFile
 +            
.serialize("file:/vol1/accumulo/inst_id/tables/2a/default_tablet/F0000072.rf")),
 +        new Value(new DataFileValue(0, 0, 0).encode()));
 +    try (BatchWriter bw = client.createBatchWriter(table)) {
 +      bw.addMutation(m);
 +    }
 +    try {
 +      client.createScanner(table).iterator()
 +          .forEachRemaining(e -> System.out.println(e.getKey() + "-> " + 
e.getValue()));
 +    } catch (TableNotFoundException e) {
 +      // TODO Auto-generated catch block
 +      e.printStackTrace();
 +    } catch (AccumuloSecurityException e) {
 +      // TODO Auto-generated catch block
 +      e.printStackTrace();
 +    } catch (AccumuloException e) {
 +      // TODO Auto-generated catch block
 +      e.printStackTrace();
 +    }
 +  }
 +
 +  private void reassignLocation(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) 
{
 +      scanner.setRange(new KeyExtent(tableIdToModify, null, 
null).toMetaRange());
 +      scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
 +      Entry<Key,Value> entry = scanner.iterator().next();
 +      Mutation m = new Mutation(entry.getKey().getRow());
 +      m.putDelete(entry.getKey().getColumnFamily(), 
entry.getKey().getColumnQualifier(),
 +          entry.getKey().getTimestamp());
 +      m.put(entry.getKey().getColumnFamily(), new Text("1234567"),
 +          entry.getKey().getTimestamp() + 1, new Value("fake:9005"));
 +      try (BatchWriter bw = client.createBatchWriter(table)) {
 +        bw.addMutation(m);
 +      }
 +    }
 +  }
 +
 +  // Sets an operation type on all tablets up to the end row
 +  private void setOperationId(AccumuloClient client, String table, String 
tableNameToModify,
 +      Text end, TabletOperationType opType) throws TableNotFoundException {
 +    var opid = TabletOperationId.from(opType, 42L);
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    try (TabletsMetadata tabletsMetadata =
 +        getServerContext().getAmple().readTablets().forTable(tableIdToModify)
 +            .overlapping(null, end != null ? 
TabletsSection.encodeRow(tableIdToModify, end) : null)
 +            .fetch(ColumnType.PREV_ROW).build()) {
 +      for (TabletMetadata tabletMetadata : tabletsMetadata) {
 +        Mutation m = new Mutation(tabletMetadata.getExtent().toMetaRow());
 +        MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m,
 +            new Value(opid.canonical()));
 +        try (BatchWriter bw = client.createBatchWriter(table)) {
 +          bw.addMutation(m);
 +        } catch (MutationsRejectedException e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +  }
 +
 +  private void removeLocation(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    BatchDeleter deleter = client.createBatchDeleter(table, 
Authorizations.EMPTY, 1);
 +    deleter
 +        .setRanges(Collections.singleton(new KeyExtent(tableIdToModify, null, 
null).toMetaRange()));
 +    deleter.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
 +    deleter.delete();
 +    deleter.close();
 +  }
 +
 +  private int findTabletsNeedingAttention(AccumuloClient client, String table,
 +      TabletManagementParameters tabletMgmtParams) throws 
TableNotFoundException, IOException {
 +    int results = 0;
 +    List<KeyExtent> resultList = new ArrayList<>();
 +    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) 
{
 +      TabletManagementIterator.configureScanner(scanner, tabletMgmtParams);
 +      scanner.updateScanIteratorOption("tabletChange", "debug", "1");
 +      for (Entry<Key,Value> e : scanner) {
 +        if (e != null) {
 +          TabletManagement mti = TabletManagementIterator.decode(e);
 +          results++;
 +          log.debug("Found tablets that changed state: {}", 
mti.getTabletMetadata().getExtent());
 +          log.debug("metadata: {}", mti.getTabletMetadata());
 +          resultList.add(mti.getTabletMetadata().getExtent());
 +        }
 +      }
 +    }
 +    log.debug("Tablets in flux: {}", resultList);
 +    return results;
 +  }
 +
 +  private void createTable(AccumuloClient client, String t, boolean online)
 +      throws AccumuloSecurityException, AccumuloException, 
TableNotFoundException,
 +      TableExistsException {
 +    SortedSet<Text> partitionKeys = new TreeSet<>();
 +    partitionKeys.add(new Text("some split"));
 +    NewTableConfiguration ntc = new 
NewTableConfiguration().withSplits(partitionKeys);
 +    client.tableOperations().create(t, ntc);
 +    client.tableOperations().online(t);
 +    if (!online) {
 +      client.tableOperations().offline(t, true);
 +    }
 +  }
 +
 +  /**
 +   * Create a copy of the source table by first gathering all the rows of the 
source in a list of
 +   * mutations. Then create the copy of the table and apply the mutations to 
the copy.
 +   */
 +  private void copyTable(AccumuloClient client, String source, String copy)
 +      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
 +      TableExistsException {
 +    try {
 +      dropTables(client, copy);
 +    } catch (TableNotFoundException ex) {
 +      // ignored
 +    }
 +
 +    log.info("Gathering rows to copy {} ", source);
 +    List<Mutation> mutations = new ArrayList<>();
 +
 +    try (Scanner scanner = client.createScanner(source, 
Authorizations.EMPTY)) {
 +      RowIterator rows = new RowIterator(new IsolatedScanner(scanner));
 +
 +      while (rows.hasNext()) {
 +        Iterator<Entry<Key,Value>> row = rows.next();
 +        Mutation m = null;
 +
 +        while (row.hasNext()) {
 +          Entry<Key,Value> entry = row.next();
 +          Key k = entry.getKey();
 +          if (m == null) {
 +            m = new Mutation(k.getRow());
 +          }
 +
 +          m.put(k.getColumnFamily(), k.getColumnQualifier(), 
k.getColumnVisibilityParsed(),
 +              k.getTimestamp(), entry.getValue());
 +        }
 +
 +        mutations.add(m);
 +      }
 +    }
 +
 +    // metadata should be stable with only 6 rows (2 for each table)
 +    log.debug("Gathered {} rows to create copy {}", mutations.size(), copy);
 +    assertEquals(8, mutations.size(), "Metadata should have 8 rows (2 for 
each table)");
 +    client.tableOperations().create(copy);
 +
 +    try (BatchWriter writer = client.createBatchWriter(copy)) {
 +      for (Mutation m : mutations) {
 +        writer.addMutation(m);
 +      }
 +    }
 +
 +    log.info("Finished creating copy " + copy);
 +  }
 +
 +  private void dropTables(AccumuloClient client, String... tables)
 +      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
 +    for (String t : tables) {
 +      client.tableOperations().delete(t);
 +    }
 +  }
 +
 +  // Creates a log entry on the "some split" extent, this could be modified 
easily to support
 +  // other extents
 +  private void createLogEntry(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws MutationsRejectedException, TableNotFoundException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    KeyExtent extent = new KeyExtent(tableIdToModify, new Text("some split"), 
null);
 +    Mutation m = new Mutation(extent.toMetaRow());
 +    String fileName = "file:/accumulo/wal/localhost+9997/" + 
UUID.randomUUID().toString();
 +    LogEntry logEntry = new LogEntry(fileName);
-     
m.at().family(LogColumnFamily.NAME).qualifier(logEntry.getColumnQualifier())
-         .put(logEntry.getValue());
++    logEntry.addToMutation(m);
 +    try (BatchWriter bw = client.createBatchWriter(table)) {
 +      bw.addMutation(m);
 +    }
 +  }
 +
 +  private static TabletManagementParameters createParameters(AccumuloClient 
client) {
 +    return createParameters(client, List.of());
 +  }
 +
 +  private static TabletManagementParameters createParameters(AccumuloClient 
client,
 +      List<Pair<Path,Path>> replacements) {
 +    var context = (ClientContext) client;
 +    Set<TableId> onlineTables = 
Sets.filter(context.getTableIdToNameMap().keySet(),
 +        tableId -> context.getTableState(tableId) == TableState.ONLINE);
 +
 +    HashSet<TServerInstance> tservers = new HashSet<>();
 +    for (String tserver : context.instanceOperations().getTabletServers()) {
 +      try {
 +        var zPath = 
ServiceLock.path(ZooUtil.getRoot(context.instanceOperations().getInstanceId())
 +            + Constants.ZTSERVERS + "/" + tserver);
 +        long sessionId = ServiceLock.getSessionId(context.getZooCache(), 
zPath);
 +        tservers.add(new TServerInstance(tserver, sessionId));
 +      } catch (Exception e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    return new TabletManagementParameters(ManagerState.NORMAL,
 +        Map.of(
 +            Ample.DataLevel.ROOT, true, Ample.DataLevel.USER, true, 
Ample.DataLevel.METADATA, true),
 +        onlineTables,
 +        new LiveTServerSet.LiveTServersSnapshot(tservers,
 +            Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)),
 +        Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, 
replacements);
 +  }
 +}


Reply via email to