This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new f675fdd859 Make garbage collecting wal and recovery files faster (#5399) f675fdd859 is described below commit f675fdd85950ba3473939460975a22a29417e6fc Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Mar 13 16:25:58 2025 -0400 Make garbage collecting wal and recovery files faster (#5399) Created a new property, gc.threads.delete.wal, which is used to configure the number of threads to use for deleting wal and recovery files. Closes #5397 --- .../org/apache/accumulo/core/conf/Property.java | 4 +- .../core/util/threads/ThreadPoolNames.java | 1 + .../accumulo/core/util/threads/ThreadPools.java | 3 + .../accumulo/gc/GarbageCollectWriteAheadLogs.java | 116 +++++++++++++++++---- .../gc/GarbageCollectWriteAheadLogsTest.java | 37 +++++-- 5 files changed, 128 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 942dfeea45..ff0cad4bab 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -940,8 +940,10 @@ public enum Property { "1.3.5"), GC_PORT("gc.port.client", "9998", PropertyType.PORT, "The listening port for the garbage collector's monitor service.", "1.3.5"), + GC_DELETE_WAL_THREADS("gc.threads.delete.wal", "4", PropertyType.COUNT, + "The number of threads used to delete write-ahead logs and recovery files.", "2.1.4"), GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, - "The number of threads used to delete RFiles and write-ahead logs.", "1.3.5"), + "The number of threads used to delete RFiles.", "1.3.5"), @Experimental GC_REMOVE_IN_USE_CANDIDATES("gc.remove.in.use.candidates", "false", PropertyType.BOOLEAN, "GC will remove deletion candidates that are in-use from the metadata location. " diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index 43a4a24b70..861a6ee89e 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@ -36,6 +36,7 @@ public enum ThreadPoolNames { COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"), COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"), GC_DELETE_POOL("accumulo.pool.gc.threads.delete"), + GC_WAL_DELETE_POOL("accumulo.pool.gc.threads.delete.wal"), GENERAL_SERVER_POOL("accumulo.pool.general.server"), GENERAL_SERVER_SIMPLETIMER_POOL("accumulo.pool.general.server.simpletimer"), IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"), diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 240a41459c..136065beb4 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_WAL_DELETE_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL; @@ -375,6 +376,8 @@ public class ThreadPools { builder.enableThreadPoolMetrics(); } return builder.build(); + case GC_DELETE_WAL_THREADS: + return getPoolBuilder(GC_WAL_DELETE_POOL).numCoreThreads(conf.getCount(p)).build(); case GC_DELETE_THREADS: return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build(); case REPLICATION_WORKER_THREADS: diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 0fe4843ad1..4d9c4e745a 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -23,16 +23,22 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.gc.thrift.GCStatus; @@ -46,6 +52,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.log.WalStateManager; @@ -266,37 +273,102 @@ public class GarbageCollectWriteAheadLogs { return result; } - private long removeFile(Path path) { - try { - if (!useTrash || !fs.moveToTrash(path)) { - fs.deleteRecursively(path); + private Future<?> removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong counter, + String msg) { + return deleteThreadPool.submit(() -> { + try { + log.debug(msg); + if (!useTrash || !fs.moveToTrash(path)) { + fs.deleteRecursively(path); + } + counter.incrementAndGet(); + } catch (FileNotFoundException ex) { + // ignored + } catch (IOException ex) { + log.error("Unable to delete {}", path, ex); } - return 1; - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ex) { - log.error("Unable to delete wal {}", path, ex); - } - - return 0; + }); } private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) { - for (Pair<WalState,Path> stateFile : collection) { - Path path = stateFile.getSecond(); - log.debug("Removing {} WAL {}", stateFile.getFirst(), path); - status.currentLog.deleted += removeFile(path); + + final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + final Map<Path,Future<?>> futures = new HashMap<>(collection.size()); + final AtomicLong counter = new AtomicLong(); + + try { + for (Pair<WalState,Path> stateFile : collection) { + Path path = stateFile.getSecond(); + futures.put(path, removeFile(deleteThreadPool, path, counter, + "Removing " + stateFile.getFirst() + " WAL " + path)); + } + + while (!futures.isEmpty()) { + Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Path,Future<?>> f = iter.next(); + if (f.getValue().isDone()) { + try { + iter.remove(); + f.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Uncaught exception deleting wal file" + f.getKey(), e); + } + } + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while sleeping", e); + } + } + } finally { + deleteThreadPool.shutdownNow(); } - return status.currentLog.deleted; + status.currentLog.deleted += counter.get(); + return counter.get(); } private long removeFiles(Collection<Path> values) { - long count = 0; - for (Path path : values) { - log.debug("Removing recovery log {}", path); - count += removeFile(path); + + final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + final Map<Path,Future<?>> futures = new HashMap<>(values.size()); + final AtomicLong counter = new AtomicLong(); + + try { + for (Path path : values) { + futures.put(path, + removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path)); + } + + while (!futures.isEmpty()) { + Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Path,Future<?>> f = iter.next(); + if (f.getValue().isDone()) { + try { + iter.remove(); + f.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException( + "Uncaught exception deleting recovery log file" + f.getKey(), e); + } + } + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while sleeping", e); + } + } + } finally { + deleteThreadPool.shutdownNow(); } - return count; + return counter.get(); } private UUID path2uuid(Path path) { diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 544407b495..1cb8cd61f6 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -28,6 +28,8 @@ import java.util.UUID; import java.util.stream.Stream; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -92,6 +94,7 @@ public class GarbageCollectWriteAheadLogsTest { @Test public void testRemoveUnusedLog() throws Exception { + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); ServerContext context = EasyMock.createMock(ServerContext.class); VolumeManager fs = EasyMock.createMock(VolumeManager.class); WalStateManager marker = EasyMock.createMock(WalStateManager.class); @@ -99,6 +102,8 @@ public class GarbageCollectWriteAheadLogsTest { GCStatus status = new GCStatus(null, null, null, new GcCycleStats()); + EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2); + EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes(); tserverSet.scanServers(); EasyMock.expectLastCall(); EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1)); @@ -108,7 +113,7 @@ public class GarbageCollectWriteAheadLogsTest { EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once(); marker.removeWalMarker(server1, id); EasyMock.expectLastCall().once(); - EasyMock.replay(context, fs, marker, tserverSet); + EasyMock.replay(conf, context, fs, marker, tserverSet); var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, tabletOnServer1List) { @Override @@ -123,11 +128,12 @@ public class GarbageCollectWriteAheadLogsTest { } }; gc.collect(status); - EasyMock.verify(context, fs, marker, tserverSet); + EasyMock.verify(conf, context, fs, marker, tserverSet); } @Test public void testKeepClosedLog() throws Exception { + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); ServerContext context = EasyMock.createMock(ServerContext.class); VolumeManager fs = EasyMock.createMock(VolumeManager.class); WalStateManager marker = EasyMock.createMock(WalStateManager.class); @@ -135,13 +141,15 @@ public class GarbageCollectWriteAheadLogsTest { GCStatus status = new GCStatus(null, null, null, new GcCycleStats()); + EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2); + EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes(); tserverSet.scanServers(); EasyMock.expectLastCall(); EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1)); EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once(); EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.CLOSED, path)); - EasyMock.replay(context, marker, tserverSet, fs); + EasyMock.replay(conf, context, marker, tserverSet, fs); var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, tabletOnServer1List) { @Override @@ -156,11 +164,12 @@ public class GarbageCollectWriteAheadLogsTest { } }; gc.collect(status); - EasyMock.verify(context, marker, tserverSet, fs); + EasyMock.verify(conf, context, marker, tserverSet, fs); } @Test public void deleteUnreferencedLogOnDeadServer() throws Exception { + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); ServerContext context = EasyMock.createMock(ServerContext.class); VolumeManager fs = EasyMock.createMock(VolumeManager.class); WalStateManager marker = EasyMock.createMock(WalStateManager.class); @@ -170,6 +179,8 @@ public class GarbageCollectWriteAheadLogsTest { GCStatus status = new GCStatus(null, null, null, new GcCycleStats()); + EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2); + EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes(); tserverSet.scanServers(); EasyMock.expectLastCall(); EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1)); @@ -195,7 +206,7 @@ public class GarbageCollectWriteAheadLogsTest { EasyMock.expectLastCall().once(); marker.forget(server2); EasyMock.expectLastCall().once(); - EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner); + EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner); var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, tabletOnServer1List) { @Override @@ -204,11 +215,12 @@ public class GarbageCollectWriteAheadLogsTest { } }; gc.collect(status); - EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner); + EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner); } @Test public void ignoreReferenceLogOnDeadServer() throws Exception { + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); ServerContext context = EasyMock.createMock(ServerContext.class); VolumeManager fs = EasyMock.createMock(VolumeManager.class); WalStateManager marker = EasyMock.createMock(WalStateManager.class); @@ -218,6 +230,8 @@ public class GarbageCollectWriteAheadLogsTest { GCStatus status = new GCStatus(null, null, null, new GcCycleStats()); + EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2); + EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes(); tserverSet.scanServers(); EasyMock.expectLastCall(); EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1)); @@ -238,7 +252,7 @@ public class GarbageCollectWriteAheadLogsTest { mscanner.setRange(ReplicationSection.getRange()); EasyMock.expectLastCall().once(); EasyMock.expect(mscanner.iterator()).andReturn(emptyKV); - EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner); + EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner); var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, tabletOnServer2List) { @Override @@ -247,11 +261,12 @@ public class GarbageCollectWriteAheadLogsTest { } }; gc.collect(status); - EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner); + EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner); } @Test public void replicationDelaysFileCollection() throws Exception { + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); ServerContext context = EasyMock.createMock(ServerContext.class); VolumeManager fs = EasyMock.createMock(VolumeManager.class); WalStateManager marker = EasyMock.createMock(WalStateManager.class); @@ -266,6 +281,8 @@ public class GarbageCollectWriteAheadLogsTest { GCStatus status = new GCStatus(null, null, null, new GcCycleStats()); + EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2); + EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes(); tserverSet.scanServers(); EasyMock.expectLastCall(); EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1)); @@ -286,7 +303,7 @@ public class GarbageCollectWriteAheadLogsTest { mscanner.setRange(ReplicationSection.getRange()); EasyMock.expectLastCall().once(); EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator()); - EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner); + EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner); var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, tabletOnServer1List) { @Override @@ -295,6 +312,6 @@ public class GarbageCollectWriteAheadLogsTest { } }; gc.collect(status); - EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner); + EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner); } }