This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 0c79e44e2df29f8f9d8617fb698167a60c0512bb Merge: 15be02a8ff f675fdd859 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Mar 13 21:14:36 2025 +0000 Merge branch '2.1' .../org/apache/accumulo/core/conf/Property.java | 4 +- .../core/util/threads/ThreadPoolNames.java | 1 + .../accumulo/core/util/threads/ThreadPools.java | 3 + .../accumulo/gc/GarbageCollectWriteAheadLogs.java | 116 +++++++++++++++++---- .../gc/GarbageCollectWriteAheadLogsTest.java | 33 ++++-- 5 files changed, 124 insertions(+), 33 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index 15c3e72871,ff0cad4bab..4f3b8fb984 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -772,8 -940,21 +772,10 @@@ public enum Property "1.3.5"), GC_PORT("gc.port.client", "9998", PropertyType.PORT, "The listening port for the garbage collector's monitor service.", "1.3.5"), + GC_DELETE_WAL_THREADS("gc.threads.delete.wal", "4", PropertyType.COUNT, + "The number of threads used to delete write-ahead logs and recovery files.", "2.1.4"), GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, - "The number of threads used to delete RFiles and write-ahead logs.", "1.3.5"), + "The number of threads used to delete RFiles.", "1.3.5"), - @Experimental - GC_REMOVE_IN_USE_CANDIDATES("gc.remove.in.use.candidates", "false", PropertyType.BOOLEAN, - "GC will remove deletion candidates that are in-use from the metadata location. " - + "This is expected to increase the speed of subsequent GC runs.", - "2.1.3"), - @Deprecated(since = "2.1.1", forRemoval = true) - GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, - "Do not use the Trash, even if it is configured.", "1.5.0"), - @Deprecated(since = "2.1.0", forRemoval = true) - GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION, - "Percent of gc cycles to trace.", "1.7.0"), GC_SAFEMODE("gc.safemode", "false", PropertyType.BOOLEAN, "Provides listing of files to be deleted but does not delete any files.", "2.1.0"), GC_USE_FULL_COMPACTION("gc.post.metadata.action", "flush", PropertyType.GC_POST_ACTION, diff --cc core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index 8e1822893a,861a6ee89e..3584fb23a0 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@@ -34,16 -35,15 +34,17 @@@ public enum ThreadPoolNames CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"), COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"), COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"), + COORDINATOR_RESERVATION_ROOT_POOL("accumulo.pool.compaction.coordinator.reservation.root"), + COORDINATOR_RESERVATION_META_POOL("accumulo.pool.compaction.coordinator.reservation.meta"), + COORDINATOR_RESERVATION_USER_POOL("accumulo.pool.compaction.coordinator.reservation.user"), GC_DELETE_POOL("accumulo.pool.gc.threads.delete"), + GC_WAL_DELETE_POOL("accumulo.pool.gc.threads.delete.wal"), GENERAL_SERVER_POOL("accumulo.pool.general.server"), - GENERAL_SERVER_SIMPLETIMER_POOL("accumulo.pool.general.server.simpletimer"), + SERVICE_LOCK_POOL("accumulo.pool.service.lock"), IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"), INSTANCE_OPS_COMPACTIONS_FINDER_POOL("accumulo.pool.instance.ops.active.compactions.finder"), - MANAGER_BULK_IMPORT_POOL("accumulo.pool.manager.bulk.import"), + INSTANCE_OPS_SCANS_FINDER_POOL("accumulo.pool.instance.ops.active.scans.finder"), MANAGER_FATE_POOL("accumulo.pool.manager.fate"), - MANAGER_RENAME_POOL("accumulo.pool.manager.rename"), MANAGER_STATUS_POOL("accumulo.pool.manager.status"), MANAGER_UPGRADE_COORDINATOR_METADATA_POOL("accumulo.pool.manager.upgrade.metadata"), METADATA_DEFAULT_SPLIT_POOL("accumulo.pool.metadata.tablet.default.splitter"), diff --cc core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 08d1d82989,136065beb4..2fc26fc91d --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@@ -22,18 -22,17 +22,19 @@@ import static java.util.concurrent.Time import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_META_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_ROOT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_USER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL; + import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_WAL_DELETE_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_RENAME_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.REPLICATION_WORKER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; @@@ -349,46 -376,12 +350,48 @@@ public class ThreadPools builder.enableThreadPoolMetrics(); } return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_ROOT: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_ROOT_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_META: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_META_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_USER: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_USER_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case GC_DELETE_WAL_THREADS: + return getPoolBuilder(GC_WAL_DELETE_POOL).numCoreThreads(conf.getCount(p)).build(); case GC_DELETE_THREADS: return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build(); - case REPLICATION_WORKER_THREADS: - builder = getPoolBuilder(REPLICATION_WORKER_POOL).numCoreThreads(conf.getCount(p)); + case COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT: + builder = getPoolBuilder(COORDINATOR_RESERVATION_ROOT_POOL).numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case COMPACTION_COORDINATOR_RESERVATION_THREADS_META: + builder = getPoolBuilder(COORDINATOR_RESERVATION_META_POOL).numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case COMPACTION_COORDINATOR_RESERVATION_THREADS_USER: + builder = getPoolBuilder(COORDINATOR_RESERVATION_USER_POOL).numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 4d2f1b4dc6,4d9c4e745a..a60c955da5 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@@ -34,20 -29,30 +35,26 @@@ import java.util.Map import java.util.Map.Entry; import java.util.Set; import java.util.UUID; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; + import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; -import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletLocationState; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; -import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.metadata.schema.filters.GcWalsFilter; +import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Pair; + import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.log.WalStateManager; @@@ -236,19 -273,21 +243,21 @@@ public class GarbageCollectWriteAheadLo return result; } - private long removeFile(Path path) { - try { - if (!fs.moveToTrash(path)) { - fs.deleteRecursively(path); + private Future<?> removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong counter, + String msg) { + return deleteThreadPool.submit(() -> { + try { + log.debug(msg); - if (!useTrash || !fs.moveToTrash(path)) { ++ if (!fs.moveToTrash(path)) { + fs.deleteRecursively(path); + } + counter.incrementAndGet(); + } catch (FileNotFoundException ex) { + // ignored + } catch (IOException ex) { + log.error("Unable to delete {}", path, ex); } - return 1; - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ex) { - log.error("Unable to delete wal {}", path, ex); - } - - return 0; + }); } private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) { @@@ -261,14 -332,49 +302,45 @@@ } private long removeFiles(Collection<Path> values) { - long count = 0; - for (Path path : values) { - log.debug("Removing recovery log {}", path); - count += removeFile(path); + + final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + final Map<Path,Future<?>> futures = new HashMap<>(values.size()); + final AtomicLong counter = new AtomicLong(); + + try { + for (Path path : values) { + futures.put(path, + removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path)); + } + + while (!futures.isEmpty()) { + Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Path,Future<?>> f = iter.next(); + if (f.getValue().isDone()) { + try { + iter.remove(); + f.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException( + "Uncaught exception deleting recovery log file" + f.getKey(), e); + } + } + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while sleeping", e); + } + } + } finally { + deleteThreadPool.shutdownNow(); } - return count; + return counter.get(); } - private UUID path2uuid(Path path) { - return UUID.fromString(path.getName()); - } - private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUID>> candidates, Set<TServerInstance> liveServers, Map<UUID,Pair<WalState,Path>> logsState, Map<UUID,Path> recoveryLogs) { diff --cc server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 75707bb58c,1cb8cd61f6..896afcc2c3 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@@ -30,7 -27,11 +30,9 @@@ import java.util.Set import java.util.UUID; import java.util.stream.Stream; -import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TabletAvailability; + import org.apache.accumulo.core.conf.AccumuloConfiguration; + import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; @@@ -80,9 -94,9 +82,10 @@@ public class GarbageCollectWriteAheadLo @Test public void testRemoveUnusedLog() throws Exception { + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); ServerContext context = EasyMock.createMock(ServerContext.class); VolumeManager fs = EasyMock.createMock(VolumeManager.class); + EasyMock.expect(fs.moveToTrash(EasyMock.anyObject())).andReturn(false).anyTimes(); WalStateManager marker = EasyMock.createMock(WalStateManager.class); LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class); @@@ -97,11 -113,13 +102,11 @@@ EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once(); marker.removeWalMarker(server1, id); EasyMock.expectLastCall().once(); - EasyMock.replay(context, fs, marker, tserverSet); + EasyMock.replay(conf, context, fs, marker, tserverSet); - var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, - tabletOnServer1List) { + var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) { @Override - @Deprecated - protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates) { - return 0; + protected Map<UUID,Path> getSortedWALogs() { + return Collections.emptyMap(); } @Override @@@ -110,10 -128,7 +115,10 @@@ } }; gc.collect(status); + assertThrows(IllegalStateException.class, () -> gc.collect(status), + "Should only be able to call collect once"); + - EasyMock.verify(context, fs, marker, tserverSet); + EasyMock.verify(conf, context, fs, marker, tserverSet); } @Test @@@ -131,30 -149,33 +139,31 @@@ EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once(); EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.CLOSED, path)); - EasyMock.replay(context, marker, tserverSet, fs); + EasyMock.replay(conf, context, marker, tserverSet, fs); - var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, - tabletOnServer1List) { - @Override - @Deprecated - protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates) { - return 0; - } + var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) { + @Override protected Map<UUID,Path> getSortedWALogs() { return Collections.emptyMap(); } + + @Override + Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) { + return tabletOnServer1List; + } }; gc.collect(status); - - EasyMock.verify(context, marker, tserverSet, fs); + EasyMock.verify(conf, context, marker, tserverSet, fs); } @Test public void deleteUnreferencedLogOnDeadServer() throws Exception { + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); ServerContext context = EasyMock.createMock(ServerContext.class); VolumeManager fs = EasyMock.createMock(VolumeManager.class); + EasyMock.expect(fs.moveToTrash(EasyMock.anyObject())).andReturn(false).anyTimes(); WalStateManager marker = EasyMock.createMock(WalStateManager.class); LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class); - Scanner mscanner = EasyMock.createMock(Scanner.class); - Scanner rscanner = EasyMock.createMock(Scanner.class); GCStatus status = new GCStatus(null, null, null, new GcCycleStats()); @@@ -170,21 -206,16 +181,20 @@@ EasyMock.expectLastCall().once(); marker.forget(server2); EasyMock.expectLastCall().once(); - EasyMock.replay(context, fs, marker, tserverSet); - EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner); - var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, - tabletOnServer1List) { ++ EasyMock.replay(conf, context, fs, marker, tserverSet); + var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) { @Override protected Map<UUID,Path> getSortedWALogs() { return Collections.emptyMap(); } + + @Override + Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) { + return tabletOnServer1List; + } }; gc.collect(status); - - EasyMock.verify(context, fs, marker, tserverSet); - EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner); ++ EasyMock.verify(conf, context, fs, marker, tserverSet); } @Test @@@ -203,22 -239,79 +216,22 @@@ EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once(); EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path)); - EasyMock.replay(context, fs, marker, tserverSet); - EasyMock.expect(context.createScanner(REPL_TABLE_NAME, Authorizations.EMPTY)) - .andReturn(rscanner); - rscanner.fetchColumnFamily(STATUS_SECTION_NAME); - EasyMock.expectLastCall().once(); - EasyMock.expect(rscanner.iterator()).andReturn(emptyKV); - - EasyMock.expect(context.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) - .andReturn(mscanner); - mscanner.fetchColumnFamily(ReplicationSection.COLF); - EasyMock.expectLastCall().once(); - mscanner.setRange(ReplicationSection.getRange()); - EasyMock.expectLastCall().once(); - EasyMock.expect(mscanner.iterator()).andReturn(emptyKV); - EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner); - var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, - tabletOnServer2List) { - @Override - protected Map<UUID,Path> getSortedWALogs() { - return Collections.emptyMap(); - } - }; ++ EasyMock.replay(conf, context, fs, marker, tserverSet); + GarbageCollectWriteAheadLogs gc = + new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) { + @Override + protected Map<UUID,Path> getSortedWALogs() { + return Collections.emptyMap(); + } + + @Override + Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) { + return tabletOnServer2List; + } + }; gc.collect(status); - EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner); - } - - @Test - public void replicationDelaysFileCollection() throws Exception { - AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); - ServerContext context = EasyMock.createMock(ServerContext.class); - VolumeManager fs = EasyMock.createMock(VolumeManager.class); - WalStateManager marker = EasyMock.createMock(WalStateManager.class); - LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class); - Scanner mscanner = EasyMock.createMock(Scanner.class); - Scanner rscanner = EasyMock.createMock(Scanner.class); - String row = ReplicationSection.getRowPrefix() + path; - String colf = ReplicationSection.COLF.toString(); - String colq = "1"; - Map<Key,Value> replicationWork = - Collections.singletonMap(new Key(row, colf, colq), new Value()); - - GCStatus status = new GCStatus(null, null, null, new GcCycleStats()); - - EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2); - EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes(); - tserverSet.scanServers(); - EasyMock.expectLastCall(); - EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1)); - EasyMock.verify(context, fs, marker, tserverSet); - EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once(); - EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.UNREFERENCED, path)); - - EasyMock.expect(context.createScanner(REPL_TABLE_NAME, Authorizations.EMPTY)) - .andReturn(rscanner); - rscanner.fetchColumnFamily(STATUS_SECTION_NAME); - EasyMock.expectLastCall().once(); - EasyMock.expect(rscanner.iterator()).andReturn(emptyKV); - - EasyMock.expect(context.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) - .andReturn(mscanner); - mscanner.fetchColumnFamily(ReplicationSection.COLF); - EasyMock.expectLastCall().once(); - mscanner.setRange(ReplicationSection.getRange()); - EasyMock.expectLastCall().once(); - EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator()); - EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner); - var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, - tabletOnServer1List) { - @Override - protected Map<UUID,Path> getSortedWALogs() { - return Collections.emptyMap(); - } - }; - gc.collect(status); - EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner); ++ EasyMock.verify(conf, context, fs, marker, tserverSet); } + }