This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new f675fdd859 Make garbage collecting wal and recovery files faster 
(#5399)
f675fdd859 is described below

commit f675fdd85950ba3473939460975a22a29417e6fc
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Mar 13 16:25:58 2025 -0400

    Make garbage collecting wal and recovery files faster (#5399)
    
    Created a new property, gc.threads.delete.wal, which is
    used to configure the number of threads to use for
    deleting wal and recovery files.
    
    Closes #5397
---
 .../org/apache/accumulo/core/conf/Property.java    |   4 +-
 .../core/util/threads/ThreadPoolNames.java         |   1 +
 .../accumulo/core/util/threads/ThreadPools.java    |   3 +
 .../accumulo/gc/GarbageCollectWriteAheadLogs.java  | 116 +++++++++++++++++----
 .../gc/GarbageCollectWriteAheadLogsTest.java       |  37 +++++--
 5 files changed, 128 insertions(+), 33 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 942dfeea45..ff0cad4bab 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -940,8 +940,10 @@ public enum Property {
       "1.3.5"),
   GC_PORT("gc.port.client", "9998", PropertyType.PORT,
       "The listening port for the garbage collector's monitor service.", 
"1.3.5"),
+  GC_DELETE_WAL_THREADS("gc.threads.delete.wal", "4", PropertyType.COUNT,
+      "The number of threads used to delete write-ahead logs and recovery 
files.", "2.1.4"),
   GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT,
-      "The number of threads used to delete RFiles and write-ahead logs.", 
"1.3.5"),
+      "The number of threads used to delete RFiles.", "1.3.5"),
   @Experimental
   GC_REMOVE_IN_USE_CANDIDATES("gc.remove.in.use.candidates", "false", 
PropertyType.BOOLEAN,
       "GC will remove deletion candidates that are in-use from the metadata 
location. "
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
index 43a4a24b70..861a6ee89e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
@@ -36,6 +36,7 @@ public enum ThreadPoolNames {
   
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
   
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
   GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
+  GC_WAL_DELETE_POOL("accumulo.pool.gc.threads.delete.wal"),
   GENERAL_SERVER_POOL("accumulo.pool.general.server"),
   GENERAL_SERVER_SIMPLETIMER_POOL("accumulo.pool.general.server.simpletimer"),
   IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"),
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 240a41459c..136065beb4 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
+import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_WAL_DELETE_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL;
@@ -375,6 +376,8 @@ public class ThreadPools {
           builder.enableThreadPoolMetrics();
         }
         return builder.build();
+      case GC_DELETE_WAL_THREADS:
+        return 
getPoolBuilder(GC_WAL_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
       case GC_DELETE_THREADS:
         return 
getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
       case REPLICATION_WORKER_THREADS:
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 0fe4843ad1..4d9c4e745a 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -23,16 +23,22 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
@@ -46,6 +52,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.log.WalStateManager;
@@ -266,37 +273,102 @@ public class GarbageCollectWriteAheadLogs {
     return result;
   }
 
-  private long removeFile(Path path) {
-    try {
-      if (!useTrash || !fs.moveToTrash(path)) {
-        fs.deleteRecursively(path);
+  private Future<?> removeFile(ExecutorService deleteThreadPool, Path path, 
AtomicLong counter,
+      String msg) {
+    return deleteThreadPool.submit(() -> {
+      try {
+        log.debug(msg);
+        if (!useTrash || !fs.moveToTrash(path)) {
+          fs.deleteRecursively(path);
+        }
+        counter.incrementAndGet();
+      } catch (FileNotFoundException ex) {
+        // ignored
+      } catch (IOException ex) {
+        log.error("Unable to delete {}", path, ex);
       }
-      return 1;
-    } catch (FileNotFoundException ex) {
-      // ignored
-    } catch (IOException ex) {
-      log.error("Unable to delete wal {}", path, ex);
-    }
-
-    return 0;
+    });
   }
 
   private long removeFiles(Collection<Pair<WalState,Path>> collection, final 
GCStatus status) {
-    for (Pair<WalState,Path> stateFile : collection) {
-      Path path = stateFile.getSecond();
-      log.debug("Removing {} WAL {}", stateFile.getFirst(), path);
-      status.currentLog.deleted += removeFile(path);
+
+    final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
+        .createExecutorService(context.getConfiguration(), 
Property.GC_DELETE_WAL_THREADS);
+    final Map<Path,Future<?>> futures = new HashMap<>(collection.size());
+    final AtomicLong counter = new AtomicLong();
+
+    try {
+      for (Pair<WalState,Path> stateFile : collection) {
+        Path path = stateFile.getSecond();
+        futures.put(path, removeFile(deleteThreadPool, path, counter,
+            "Removing " + stateFile.getFirst() + " WAL " + path));
+      }
+
+      while (!futures.isEmpty()) {
+        Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
+        while (iter.hasNext()) {
+          Entry<Path,Future<?>> f = iter.next();
+          if (f.getValue().isDone()) {
+            try {
+              iter.remove();
+              f.getValue().get();
+            } catch (InterruptedException | ExecutionException e) {
+              throw new RuntimeException("Uncaught exception deleting wal 
file" + f.getKey(), e);
+            }
+          }
+        }
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException("Interrupted while sleeping", e);
+        }
+      }
+    } finally {
+      deleteThreadPool.shutdownNow();
     }
-    return status.currentLog.deleted;
+    status.currentLog.deleted += counter.get();
+    return counter.get();
   }
 
   private long removeFiles(Collection<Path> values) {
-    long count = 0;
-    for (Path path : values) {
-      log.debug("Removing recovery log {}", path);
-      count += removeFile(path);
+
+    final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
+        .createExecutorService(context.getConfiguration(), 
Property.GC_DELETE_WAL_THREADS);
+    final Map<Path,Future<?>> futures = new HashMap<>(values.size());
+    final AtomicLong counter = new AtomicLong();
+
+    try {
+      for (Path path : values) {
+        futures.put(path,
+            removeFile(deleteThreadPool, path, counter, "Removing recovery log 
" + path));
+      }
+
+      while (!futures.isEmpty()) {
+        Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
+        while (iter.hasNext()) {
+          Entry<Path,Future<?>> f = iter.next();
+          if (f.getValue().isDone()) {
+            try {
+              iter.remove();
+              f.getValue().get();
+            } catch (InterruptedException | ExecutionException e) {
+              throw new RuntimeException(
+                  "Uncaught exception deleting recovery log file" + 
f.getKey(), e);
+            }
+          }
+        }
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException("Interrupted while sleeping", e);
+        }
+      }
+    } finally {
+      deleteThreadPool.shutdownNow();
     }
-    return count;
+    return counter.get();
   }
 
   private UUID path2uuid(Path path) {
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 544407b495..1cb8cd61f6 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -28,6 +28,8 @@ import java.util.UUID;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -92,6 +94,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
   @Test
   public void testRemoveUnusedLog() throws Exception {
+    AccumuloConfiguration conf = 
EasyMock.createMock(AccumuloConfiguration.class);
     ServerContext context = EasyMock.createMock(ServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
     WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -99,6 +102,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
     GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
 
+    EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
+    
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
     tserverSet.scanServers();
     EasyMock.expectLastCall();
     
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -108,7 +113,7 @@ public class GarbageCollectWriteAheadLogsTest {
     EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
     marker.removeWalMarker(server1, id);
     EasyMock.expectLastCall().once();
-    EasyMock.replay(context, fs, marker, tserverSet);
+    EasyMock.replay(conf, context, fs, marker, tserverSet);
     var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
         tabletOnServer1List) {
       @Override
@@ -123,11 +128,12 @@ public class GarbageCollectWriteAheadLogsTest {
       }
     };
     gc.collect(status);
-    EasyMock.verify(context, fs, marker, tserverSet);
+    EasyMock.verify(conf, context, fs, marker, tserverSet);
   }
 
   @Test
   public void testKeepClosedLog() throws Exception {
+    AccumuloConfiguration conf = 
EasyMock.createMock(AccumuloConfiguration.class);
     ServerContext context = EasyMock.createMock(ServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
     WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -135,13 +141,15 @@ public class GarbageCollectWriteAheadLogsTest {
 
     GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
 
+    EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
+    
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
     tserverSet.scanServers();
     EasyMock.expectLastCall();
     
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
 
     EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
     EasyMock.expect(marker.state(server1, id)).andReturn(new 
Pair<>(WalState.CLOSED, path));
-    EasyMock.replay(context, marker, tserverSet, fs);
+    EasyMock.replay(conf, context, marker, tserverSet, fs);
     var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
         tabletOnServer1List) {
       @Override
@@ -156,11 +164,12 @@ public class GarbageCollectWriteAheadLogsTest {
       }
     };
     gc.collect(status);
-    EasyMock.verify(context, marker, tserverSet, fs);
+    EasyMock.verify(conf, context, marker, tserverSet, fs);
   }
 
   @Test
   public void deleteUnreferencedLogOnDeadServer() throws Exception {
+    AccumuloConfiguration conf = 
EasyMock.createMock(AccumuloConfiguration.class);
     ServerContext context = EasyMock.createMock(ServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
     WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -170,6 +179,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
     GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
 
+    EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
+    
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
     tserverSet.scanServers();
     EasyMock.expectLastCall();
     
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -195,7 +206,7 @@ public class GarbageCollectWriteAheadLogsTest {
     EasyMock.expectLastCall().once();
     marker.forget(server2);
     EasyMock.expectLastCall().once();
-    EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
+    EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
     var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
         tabletOnServer1List) {
       @Override
@@ -204,11 +215,12 @@ public class GarbageCollectWriteAheadLogsTest {
       }
     };
     gc.collect(status);
-    EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
+    EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
   }
 
   @Test
   public void ignoreReferenceLogOnDeadServer() throws Exception {
+    AccumuloConfiguration conf = 
EasyMock.createMock(AccumuloConfiguration.class);
     ServerContext context = EasyMock.createMock(ServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
     WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -218,6 +230,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
     GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
 
+    EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
+    
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
     tserverSet.scanServers();
     EasyMock.expectLastCall();
     
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -238,7 +252,7 @@ public class GarbageCollectWriteAheadLogsTest {
     mscanner.setRange(ReplicationSection.getRange());
     EasyMock.expectLastCall().once();
     EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
-    EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
+    EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
     var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
         tabletOnServer2List) {
       @Override
@@ -247,11 +261,12 @@ public class GarbageCollectWriteAheadLogsTest {
       }
     };
     gc.collect(status);
-    EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
+    EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
   }
 
   @Test
   public void replicationDelaysFileCollection() throws Exception {
+    AccumuloConfiguration conf = 
EasyMock.createMock(AccumuloConfiguration.class);
     ServerContext context = EasyMock.createMock(ServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
     WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -266,6 +281,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
     GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
 
+    EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
+    
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
     tserverSet.scanServers();
     EasyMock.expectLastCall();
     
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -286,7 +303,7 @@ public class GarbageCollectWriteAheadLogsTest {
     mscanner.setRange(ReplicationSection.getRange());
     EasyMock.expectLastCall().once();
     
EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
-    EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
+    EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
     var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
         tabletOnServer1List) {
       @Override
@@ -295,6 +312,6 @@ public class GarbageCollectWriteAheadLogsTest {
       }
     };
     gc.collect(status);
-    EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
+    EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
   }
 }

Reply via email to