This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 0c79e44e2df29f8f9d8617fb698167a60c0512bb
Merge: 15be02a8ff f675fdd859
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Mar 13 21:14:36 2025 +0000

    Merge branch '2.1'

 .../org/apache/accumulo/core/conf/Property.java    |   4 +-
 .../core/util/threads/ThreadPoolNames.java         |   1 +
 .../accumulo/core/util/threads/ThreadPools.java    |   3 +
 .../accumulo/gc/GarbageCollectWriteAheadLogs.java  | 116 +++++++++++++++++----
 .../gc/GarbageCollectWriteAheadLogsTest.java       |  33 ++++--
 5 files changed, 124 insertions(+), 33 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 15c3e72871,ff0cad4bab..4f3b8fb984
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -772,8 -940,21 +772,10 @@@ public enum Property 
        "1.3.5"),
    GC_PORT("gc.port.client", "9998", PropertyType.PORT,
        "The listening port for the garbage collector's monitor service.", 
"1.3.5"),
+   GC_DELETE_WAL_THREADS("gc.threads.delete.wal", "4", PropertyType.COUNT,
+       "The number of threads used to delete write-ahead logs and recovery 
files.", "2.1.4"),
    GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT,
-       "The number of threads used to delete RFiles and write-ahead logs.", 
"1.3.5"),
+       "The number of threads used to delete RFiles.", "1.3.5"),
 -  @Experimental
 -  GC_REMOVE_IN_USE_CANDIDATES("gc.remove.in.use.candidates", "false", 
PropertyType.BOOLEAN,
 -      "GC will remove deletion candidates that are in-use from the metadata 
location. "
 -          + "This is expected to increase the speed of subsequent GC runs.",
 -      "2.1.3"),
 -  @Deprecated(since = "2.1.1", forRemoval = true)
 -  GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN,
 -      "Do not use the Trash, even if it is configured.", "1.5.0"),
 -  @Deprecated(since = "2.1.0", forRemoval = true)
 -  GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION,
 -      "Percent of gc cycles to trace.", "1.7.0"),
    GC_SAFEMODE("gc.safemode", "false", PropertyType.BOOLEAN,
        "Provides listing of files to be deleted but does not delete any 
files.", "2.1.0"),
    GC_USE_FULL_COMPACTION("gc.post.metadata.action", "flush", 
PropertyType.GC_POST_ACTION,
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
index 8e1822893a,861a6ee89e..3584fb23a0
--- 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
@@@ -34,16 -35,15 +34,17 @@@ public enum ThreadPoolNames 
    
CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"),
    
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
    
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
 +  
COORDINATOR_RESERVATION_ROOT_POOL("accumulo.pool.compaction.coordinator.reservation.root"),
 +  
COORDINATOR_RESERVATION_META_POOL("accumulo.pool.compaction.coordinator.reservation.meta"),
 +  
COORDINATOR_RESERVATION_USER_POOL("accumulo.pool.compaction.coordinator.reservation.user"),
    GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
+   GC_WAL_DELETE_POOL("accumulo.pool.gc.threads.delete.wal"),
    GENERAL_SERVER_POOL("accumulo.pool.general.server"),
 -  GENERAL_SERVER_SIMPLETIMER_POOL("accumulo.pool.general.server.simpletimer"),
 +  SERVICE_LOCK_POOL("accumulo.pool.service.lock"),
    IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"),
    
INSTANCE_OPS_COMPACTIONS_FINDER_POOL("accumulo.pool.instance.ops.active.compactions.finder"),
 -  MANAGER_BULK_IMPORT_POOL("accumulo.pool.manager.bulk.import"),
 +  
INSTANCE_OPS_SCANS_FINDER_POOL("accumulo.pool.instance.ops.active.scans.finder"),
    MANAGER_FATE_POOL("accumulo.pool.manager.fate"),
 -  MANAGER_RENAME_POOL("accumulo.pool.manager.rename"),
    MANAGER_STATUS_POOL("accumulo.pool.manager.status"),
    
MANAGER_UPGRADE_COORDINATOR_METADATA_POOL("accumulo.pool.manager.upgrade.metadata"),
    
METADATA_DEFAULT_SPLIT_POOL("accumulo.pool.metadata.tablet.default.splitter"),
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 08d1d82989,136065beb4..2fc26fc91d
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@@ -22,18 -22,17 +22,19 @@@ import static java.util.concurrent.Time
  import static java.util.concurrent.TimeUnit.MINUTES;
  import static java.util.concurrent.TimeUnit.SECONDS;
  import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
 +import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_META_POOL;
 +import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_ROOT_POOL;
 +import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_USER_POOL;
  import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
+ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_WAL_DELETE_POOL;
  import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL;
  import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_RENAME_POOL;
  import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL;
 -import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.REPLICATION_WORKER_POOL;
  import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL;
  import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL;
 +import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL;
 +import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL;
 +import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL;
  import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL;
  import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
  import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
@@@ -349,46 -376,12 +350,48 @@@ public class ThreadPools 
            builder.enableThreadPoolMetrics();
          }
          return builder.build();
 +      case TSERV_CONDITIONAL_UPDATE_THREADS_ROOT:
 +        builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_ROOT_POOL)
 +            .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
 +        if (emitThreadPoolMetrics) {
 +          builder.enableThreadPoolMetrics();
 +        }
 +        return builder.build();
 +      case TSERV_CONDITIONAL_UPDATE_THREADS_META:
 +        builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_META_POOL)
 +            .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
 +        if (emitThreadPoolMetrics) {
 +          builder.enableThreadPoolMetrics();
 +        }
 +        return builder.build();
 +      case TSERV_CONDITIONAL_UPDATE_THREADS_USER:
 +        builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_USER_POOL)
 +            .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
 +        if (emitThreadPoolMetrics) {
 +          builder.enableThreadPoolMetrics();
 +        }
 +        return builder.build();
+       case GC_DELETE_WAL_THREADS:
+         return 
getPoolBuilder(GC_WAL_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
        case GC_DELETE_THREADS:
          return 
getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
 -      case REPLICATION_WORKER_THREADS:
 -        builder = 
getPoolBuilder(REPLICATION_WORKER_POOL).numCoreThreads(conf.getCount(p));
 +      case COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT:
 +        builder = 
getPoolBuilder(COORDINATOR_RESERVATION_ROOT_POOL).numCoreThreads(conf.getCount(p))
 +            .withTimeOut(60L, MILLISECONDS);
 +        if (emitThreadPoolMetrics) {
 +          builder.enableThreadPoolMetrics();
 +        }
 +        return builder.build();
 +      case COMPACTION_COORDINATOR_RESERVATION_THREADS_META:
 +        builder = 
getPoolBuilder(COORDINATOR_RESERVATION_META_POOL).numCoreThreads(conf.getCount(p))
 +            .withTimeOut(60L, MILLISECONDS);
 +        if (emitThreadPoolMetrics) {
 +          builder.enableThreadPoolMetrics();
 +        }
 +        return builder.build();
 +      case COMPACTION_COORDINATOR_RESERVATION_THREADS_USER:
 +        builder = 
getPoolBuilder(COORDINATOR_RESERVATION_USER_POOL).numCoreThreads(conf.getCount(p))
 +            .withTimeOut(60L, MILLISECONDS);
          if (emitThreadPoolMetrics) {
            builder.enableThreadPoolMetrics();
          }
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 4d2f1b4dc6,4d9c4e745a..a60c955da5
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -34,20 -29,30 +35,26 @@@ import java.util.Map
  import java.util.Map.Entry;
  import java.util.Set;
  import java.util.UUID;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Future;
  import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicLong;
  import java.util.stream.Stream;
  
 -import org.apache.accumulo.core.client.Scanner;
 -import org.apache.accumulo.core.client.TableNotFoundException;
+ import org.apache.accumulo.core.conf.Property;
 -import org.apache.accumulo.core.data.Key;
 -import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.gc.thrift.GCStatus;
  import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 -import org.apache.accumulo.core.metadata.MetadataTable;
  import org.apache.accumulo.core.metadata.TServerInstance;
 -import org.apache.accumulo.core.metadata.TabletLocationState;
  import org.apache.accumulo.core.metadata.TabletState;
  import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 -import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 -import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import org.apache.accumulo.core.metadata.schema.filters.GcWalsFilter;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
  import org.apache.accumulo.core.trace.TraceUtil;
  import org.apache.accumulo.core.util.Pair;
+ import org.apache.accumulo.core.util.threads.ThreadPools;
  import org.apache.accumulo.server.ServerContext;
  import org.apache.accumulo.server.fs.VolumeManager;
  import org.apache.accumulo.server.log.WalStateManager;
@@@ -236,19 -273,21 +243,21 @@@ public class GarbageCollectWriteAheadLo
      return result;
    }
  
-   private long removeFile(Path path) {
-     try {
-       if (!fs.moveToTrash(path)) {
-         fs.deleteRecursively(path);
+   private Future<?> removeFile(ExecutorService deleteThreadPool, Path path, 
AtomicLong counter,
+       String msg) {
+     return deleteThreadPool.submit(() -> {
+       try {
+         log.debug(msg);
 -        if (!useTrash || !fs.moveToTrash(path)) {
++        if (!fs.moveToTrash(path)) {
+           fs.deleteRecursively(path);
+         }
+         counter.incrementAndGet();
+       } catch (FileNotFoundException ex) {
+         // ignored
+       } catch (IOException ex) {
+         log.error("Unable to delete {}", path, ex);
        }
-       return 1;
-     } catch (FileNotFoundException ex) {
-       // ignored
-     } catch (IOException ex) {
-       log.error("Unable to delete wal {}", path, ex);
-     }
- 
-     return 0;
+     });
    }
  
    private long removeFiles(Collection<Pair<WalState,Path>> collection, final 
GCStatus status) {
@@@ -261,14 -332,49 +302,45 @@@
    }
  
    private long removeFiles(Collection<Path> values) {
-     long count = 0;
-     for (Path path : values) {
-       log.debug("Removing recovery log {}", path);
-       count += removeFile(path);
+ 
+     final ExecutorService deleteThreadPool = 
ThreadPools.getServerThreadPools()
+         .createExecutorService(context.getConfiguration(), 
Property.GC_DELETE_WAL_THREADS);
+     final Map<Path,Future<?>> futures = new HashMap<>(values.size());
+     final AtomicLong counter = new AtomicLong();
+ 
+     try {
+       for (Path path : values) {
+         futures.put(path,
+             removeFile(deleteThreadPool, path, counter, "Removing recovery 
log " + path));
+       }
+ 
+       while (!futures.isEmpty()) {
+         Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
+         while (iter.hasNext()) {
+           Entry<Path,Future<?>> f = iter.next();
+           if (f.getValue().isDone()) {
+             try {
+               iter.remove();
+               f.getValue().get();
+             } catch (InterruptedException | ExecutionException e) {
+               throw new RuntimeException(
+                   "Uncaught exception deleting recovery log file" + 
f.getKey(), e);
+             }
+           }
+         }
+         try {
+           Thread.sleep(500);
+         } catch (InterruptedException e) {
+           Thread.currentThread().interrupt();
+           throw new RuntimeException("Interrupted while sleeping", e);
+         }
+       }
+     } finally {
+       deleteThreadPool.shutdownNow();
      }
-     return count;
+     return counter.get();
    }
  
 -  private UUID path2uuid(Path path) {
 -    return UUID.fromString(path.getName());
 -  }
 -
    private Map<UUID,TServerInstance> 
removeEntriesInUse(Map<TServerInstance,Set<UUID>> candidates,
        Set<TServerInstance> liveServers, Map<UUID,Pair<WalState,Path>> 
logsState,
        Map<UUID,Path> recoveryLogs) {
diff --cc 
server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 75707bb58c,1cb8cd61f6..896afcc2c3
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@@ -30,7 -27,11 +30,9 @@@ import java.util.Set
  import java.util.UUID;
  import java.util.stream.Stream;
  
 -import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.admin.TabletAvailability;
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.conf.Property;
 -import org.apache.accumulo.core.data.Key;
 -import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
  import org.apache.accumulo.core.gc.thrift.GCStatus;
  import org.apache.accumulo.core.gc.thrift.GcCycleStats;
@@@ -80,9 -94,9 +82,10 @@@ public class GarbageCollectWriteAheadLo
  
    @Test
    public void testRemoveUnusedLog() throws Exception {
+     AccumuloConfiguration conf = 
EasyMock.createMock(AccumuloConfiguration.class);
      ServerContext context = EasyMock.createMock(ServerContext.class);
      VolumeManager fs = EasyMock.createMock(VolumeManager.class);
 +    
EasyMock.expect(fs.moveToTrash(EasyMock.anyObject())).andReturn(false).anyTimes();
      WalStateManager marker = EasyMock.createMock(WalStateManager.class);
      LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
  
@@@ -97,11 -113,13 +102,11 @@@
      EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
      marker.removeWalMarker(server1, id);
      EasyMock.expectLastCall().once();
-     EasyMock.replay(context, fs, marker, tserverSet);
+     EasyMock.replay(conf, context, fs, marker, tserverSet);
 -    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
 -        tabletOnServer1List) {
 +    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, 
marker) {
        @Override
 -      @Deprecated
 -      protected int removeReplicationEntries(Map<UUID,TServerInstance> 
candidates) {
 -        return 0;
 +      protected Map<UUID,Path> getSortedWALogs() {
 +        return Collections.emptyMap();
        }
  
        @Override
@@@ -110,10 -128,7 +115,10 @@@
        }
      };
      gc.collect(status);
 +    assertThrows(IllegalStateException.class, () -> gc.collect(status),
 +        "Should only be able to call collect once");
 +
-     EasyMock.verify(context, fs, marker, tserverSet);
+     EasyMock.verify(conf, context, fs, marker, tserverSet);
    }
  
    @Test
@@@ -131,30 -149,33 +139,31 @@@
  
      EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
      EasyMock.expect(marker.state(server1, id)).andReturn(new 
Pair<>(WalState.CLOSED, path));
-     EasyMock.replay(context, marker, tserverSet, fs);
+     EasyMock.replay(conf, context, marker, tserverSet, fs);
 -    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
 -        tabletOnServer1List) {
 -      @Override
 -      @Deprecated
 -      protected int removeReplicationEntries(Map<UUID,TServerInstance> 
candidates) {
 -        return 0;
 -      }
 +    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, 
marker) {
+ 
        @Override
        protected Map<UUID,Path> getSortedWALogs() {
          return Collections.emptyMap();
        }
 +
 +      @Override
 +      Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) {
 +        return tabletOnServer1List;
 +      }
      };
      gc.collect(status);
- 
-     EasyMock.verify(context, marker, tserverSet, fs);
+     EasyMock.verify(conf, context, marker, tserverSet, fs);
    }
  
    @Test
    public void deleteUnreferencedLogOnDeadServer() throws Exception {
+     AccumuloConfiguration conf = 
EasyMock.createMock(AccumuloConfiguration.class);
      ServerContext context = EasyMock.createMock(ServerContext.class);
      VolumeManager fs = EasyMock.createMock(VolumeManager.class);
 +    
EasyMock.expect(fs.moveToTrash(EasyMock.anyObject())).andReturn(false).anyTimes();
      WalStateManager marker = EasyMock.createMock(WalStateManager.class);
      LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
 -    Scanner mscanner = EasyMock.createMock(Scanner.class);
 -    Scanner rscanner = EasyMock.createMock(Scanner.class);
  
      GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
  
@@@ -170,21 -206,16 +181,20 @@@
      EasyMock.expectLastCall().once();
      marker.forget(server2);
      EasyMock.expectLastCall().once();
-     EasyMock.replay(context, fs, marker, tserverSet);
 -    EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, 
mscanner);
 -    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
 -        tabletOnServer1List) {
++    EasyMock.replay(conf, context, fs, marker, tserverSet);
 +    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, 
marker) {
        @Override
        protected Map<UUID,Path> getSortedWALogs() {
          return Collections.emptyMap();
        }
 +
 +      @Override
 +      Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) {
 +        return tabletOnServer1List;
 +      }
      };
      gc.collect(status);
- 
-     EasyMock.verify(context, fs, marker, tserverSet);
 -    EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, 
mscanner);
++    EasyMock.verify(conf, context, fs, marker, tserverSet);
    }
  
    @Test
@@@ -203,22 -239,79 +216,22 @@@
      EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
      EasyMock.expect(marker.state(server2, id)).andReturn(new 
Pair<>(WalState.OPEN, path));
  
-     EasyMock.replay(context, fs, marker, tserverSet);
 -    EasyMock.expect(context.createScanner(REPL_TABLE_NAME, 
Authorizations.EMPTY))
 -        .andReturn(rscanner);
 -    rscanner.fetchColumnFamily(STATUS_SECTION_NAME);
 -    EasyMock.expectLastCall().once();
 -    EasyMock.expect(rscanner.iterator()).andReturn(emptyKV);
 -
 -    EasyMock.expect(context.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY))
 -        .andReturn(mscanner);
 -    mscanner.fetchColumnFamily(ReplicationSection.COLF);
 -    EasyMock.expectLastCall().once();
 -    mscanner.setRange(ReplicationSection.getRange());
 -    EasyMock.expectLastCall().once();
 -    EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
 -    EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, 
mscanner);
 -    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
 -        tabletOnServer2List) {
 -      @Override
 -      protected Map<UUID,Path> getSortedWALogs() {
 -        return Collections.emptyMap();
 -      }
 -    };
++    EasyMock.replay(conf, context, fs, marker, tserverSet);
 +    GarbageCollectWriteAheadLogs gc =
 +        new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) {
 +          @Override
 +          protected Map<UUID,Path> getSortedWALogs() {
 +            return Collections.emptyMap();
 +          }
 +
 +          @Override
 +          Stream<TabletMetadata> createStore(Set<TServerInstance> 
liveTservers) {
 +            return tabletOnServer2List;
 +          }
 +        };
      gc.collect(status);
 -    EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, 
mscanner);
 -  }
 -
 -  @Test
 -  public void replicationDelaysFileCollection() throws Exception {
 -    AccumuloConfiguration conf = 
EasyMock.createMock(AccumuloConfiguration.class);
 -    ServerContext context = EasyMock.createMock(ServerContext.class);
 -    VolumeManager fs = EasyMock.createMock(VolumeManager.class);
 -    WalStateManager marker = EasyMock.createMock(WalStateManager.class);
 -    LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
 -    Scanner mscanner = EasyMock.createMock(Scanner.class);
 -    Scanner rscanner = EasyMock.createMock(Scanner.class);
 -    String row = ReplicationSection.getRowPrefix() + path;
 -    String colf = ReplicationSection.COLF.toString();
 -    String colq = "1";
 -    Map<Key,Value> replicationWork =
 -        Collections.singletonMap(new Key(row, colf, colq), new Value());
 -
 -    GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
 -
 -    EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
 -    
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
 -    tserverSet.scanServers();
 -    EasyMock.expectLastCall();
 -    
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
  
-     EasyMock.verify(context, fs, marker, tserverSet);
 -    EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
 -    EasyMock.expect(marker.state(server1, id)).andReturn(new 
Pair<>(WalState.UNREFERENCED, path));
 -
 -    EasyMock.expect(context.createScanner(REPL_TABLE_NAME, 
Authorizations.EMPTY))
 -        .andReturn(rscanner);
 -    rscanner.fetchColumnFamily(STATUS_SECTION_NAME);
 -    EasyMock.expectLastCall().once();
 -    EasyMock.expect(rscanner.iterator()).andReturn(emptyKV);
 -
 -    EasyMock.expect(context.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY))
 -        .andReturn(mscanner);
 -    mscanner.fetchColumnFamily(ReplicationSection.COLF);
 -    EasyMock.expectLastCall().once();
 -    mscanner.setRange(ReplicationSection.getRange());
 -    EasyMock.expectLastCall().once();
 -    
EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
 -    EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, 
mscanner);
 -    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
 -        tabletOnServer1List) {
 -      @Override
 -      protected Map<UUID,Path> getSortedWALogs() {
 -        return Collections.emptyMap();
 -      }
 -    };
 -    gc.collect(status);
 -    EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, 
mscanner);
++    EasyMock.verify(conf, context, fs, marker, tserverSet);
    }
 +
  }

Reply via email to