This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new f675fdd859 Make garbage collecting wal and recovery files faster
(#5399)
f675fdd859 is described below
commit f675fdd85950ba3473939460975a22a29417e6fc
Author: Dave Marion <[email protected]>
AuthorDate: Thu Mar 13 16:25:58 2025 -0400
Make garbage collecting wal and recovery files faster (#5399)
Created a new property, gc.threads.delete.wal, which is
used to configure the number of threads to use for
deleting wal and recovery files.
Closes #5397
---
.../org/apache/accumulo/core/conf/Property.java | 4 +-
.../core/util/threads/ThreadPoolNames.java | 1 +
.../accumulo/core/util/threads/ThreadPools.java | 3 +
.../accumulo/gc/GarbageCollectWriteAheadLogs.java | 116 +++++++++++++++++----
.../gc/GarbageCollectWriteAheadLogsTest.java | 37 +++++--
5 files changed, 128 insertions(+), 33 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 942dfeea45..ff0cad4bab 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -940,8 +940,10 @@ public enum Property {
"1.3.5"),
GC_PORT("gc.port.client", "9998", PropertyType.PORT,
"The listening port for the garbage collector's monitor service.",
"1.3.5"),
+ GC_DELETE_WAL_THREADS("gc.threads.delete.wal", "4", PropertyType.COUNT,
+ "The number of threads used to delete write-ahead logs and recovery
files.", "2.1.4"),
GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT,
- "The number of threads used to delete RFiles and write-ahead logs.",
"1.3.5"),
+ "The number of threads used to delete RFiles.", "1.3.5"),
@Experimental
GC_REMOVE_IN_USE_CANDIDATES("gc.remove.in.use.candidates", "false",
PropertyType.BOOLEAN,
"GC will remove deletion candidates that are in-use from the metadata
location. "
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
index 43a4a24b70..861a6ee89e 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
@@ -36,6 +36,7 @@ public enum ThreadPoolNames {
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
+ GC_WAL_DELETE_POOL("accumulo.pool.gc.threads.delete.wal"),
GENERAL_SERVER_POOL("accumulo.pool.general.server"),
GENERAL_SERVER_SIMPLETIMER_POOL("accumulo.pool.general.server.simpletimer"),
IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"),
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 240a41459c..136065beb4 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_WAL_DELETE_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL;
@@ -375,6 +376,8 @@ public class ThreadPools {
builder.enableThreadPoolMetrics();
}
return builder.build();
+ case GC_DELETE_WAL_THREADS:
+ return
getPoolBuilder(GC_WAL_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
case GC_DELETE_THREADS:
return
getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
case REPLICATION_WORKER_THREADS:
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 0fe4843ad1..4d9c4e745a 100644
---
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -23,16 +23,22 @@ import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.gc.thrift.GCStatus;
@@ -46,6 +52,7 @@ import
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.WalStateManager;
@@ -266,37 +273,102 @@ public class GarbageCollectWriteAheadLogs {
return result;
}
- private long removeFile(Path path) {
- try {
- if (!useTrash || !fs.moveToTrash(path)) {
- fs.deleteRecursively(path);
+ private Future<?> removeFile(ExecutorService deleteThreadPool, Path path,
AtomicLong counter,
+ String msg) {
+ return deleteThreadPool.submit(() -> {
+ try {
+ log.debug(msg);
+ if (!useTrash || !fs.moveToTrash(path)) {
+ fs.deleteRecursively(path);
+ }
+ counter.incrementAndGet();
+ } catch (FileNotFoundException ex) {
+ // ignored
+ } catch (IOException ex) {
+ log.error("Unable to delete {}", path, ex);
}
- return 1;
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ex) {
- log.error("Unable to delete wal {}", path, ex);
- }
-
- return 0;
+ });
}
private long removeFiles(Collection<Pair<WalState,Path>> collection, final
GCStatus status) {
- for (Pair<WalState,Path> stateFile : collection) {
- Path path = stateFile.getSecond();
- log.debug("Removing {} WAL {}", stateFile.getFirst(), path);
- status.currentLog.deleted += removeFile(path);
+
+ final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
+ .createExecutorService(context.getConfiguration(),
Property.GC_DELETE_WAL_THREADS);
+ final Map<Path,Future<?>> futures = new HashMap<>(collection.size());
+ final AtomicLong counter = new AtomicLong();
+
+ try {
+ for (Pair<WalState,Path> stateFile : collection) {
+ Path path = stateFile.getSecond();
+ futures.put(path, removeFile(deleteThreadPool, path, counter,
+ "Removing " + stateFile.getFirst() + " WAL " + path));
+ }
+
+ while (!futures.isEmpty()) {
+ Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<Path,Future<?>> f = iter.next();
+ if (f.getValue().isDone()) {
+ try {
+ iter.remove();
+ f.getValue().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Uncaught exception deleting wal
file" + f.getKey(), e);
+ }
+ }
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while sleeping", e);
+ }
+ }
+ } finally {
+ deleteThreadPool.shutdownNow();
}
- return status.currentLog.deleted;
+ status.currentLog.deleted += counter.get();
+ return counter.get();
}
private long removeFiles(Collection<Path> values) {
- long count = 0;
- for (Path path : values) {
- log.debug("Removing recovery log {}", path);
- count += removeFile(path);
+
+ final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
+ .createExecutorService(context.getConfiguration(),
Property.GC_DELETE_WAL_THREADS);
+ final Map<Path,Future<?>> futures = new HashMap<>(values.size());
+ final AtomicLong counter = new AtomicLong();
+
+ try {
+ for (Path path : values) {
+ futures.put(path,
+ removeFile(deleteThreadPool, path, counter, "Removing recovery log
" + path));
+ }
+
+ while (!futures.isEmpty()) {
+ Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<Path,Future<?>> f = iter.next();
+ if (f.getValue().isDone()) {
+ try {
+ iter.remove();
+ f.getValue().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(
+ "Uncaught exception deleting recovery log file" +
f.getKey(), e);
+ }
+ }
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while sleeping", e);
+ }
+ }
+ } finally {
+ deleteThreadPool.shutdownNow();
}
- return count;
+ return counter.get();
}
private UUID path2uuid(Path path) {
diff --git
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 544407b495..1cb8cd61f6 100644
---
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -28,6 +28,8 @@ import java.util.UUID;
import java.util.stream.Stream;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -92,6 +94,7 @@ public class GarbageCollectWriteAheadLogsTest {
@Test
public void testRemoveUnusedLog() throws Exception {
+ AccumuloConfiguration conf =
EasyMock.createMock(AccumuloConfiguration.class);
ServerContext context = EasyMock.createMock(ServerContext.class);
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -99,6 +102,8 @@ public class GarbageCollectWriteAheadLogsTest {
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+ EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
tserverSet.scanServers();
EasyMock.expectLastCall();
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -108,7 +113,7 @@ public class GarbageCollectWriteAheadLogsTest {
EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
marker.removeWalMarker(server1, id);
EasyMock.expectLastCall().once();
- EasyMock.replay(context, fs, marker, tserverSet);
+ EasyMock.replay(conf, context, fs, marker, tserverSet);
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false,
marker,
tabletOnServer1List) {
@Override
@@ -123,11 +128,12 @@ public class GarbageCollectWriteAheadLogsTest {
}
};
gc.collect(status);
- EasyMock.verify(context, fs, marker, tserverSet);
+ EasyMock.verify(conf, context, fs, marker, tserverSet);
}
@Test
public void testKeepClosedLog() throws Exception {
+ AccumuloConfiguration conf =
EasyMock.createMock(AccumuloConfiguration.class);
ServerContext context = EasyMock.createMock(ServerContext.class);
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -135,13 +141,15 @@ public class GarbageCollectWriteAheadLogsTest {
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+ EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
tserverSet.scanServers();
EasyMock.expectLastCall();
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
EasyMock.expect(marker.state(server1, id)).andReturn(new
Pair<>(WalState.CLOSED, path));
- EasyMock.replay(context, marker, tserverSet, fs);
+ EasyMock.replay(conf, context, marker, tserverSet, fs);
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false,
marker,
tabletOnServer1List) {
@Override
@@ -156,11 +164,12 @@ public class GarbageCollectWriteAheadLogsTest {
}
};
gc.collect(status);
- EasyMock.verify(context, marker, tserverSet, fs);
+ EasyMock.verify(conf, context, marker, tserverSet, fs);
}
@Test
public void deleteUnreferencedLogOnDeadServer() throws Exception {
+ AccumuloConfiguration conf =
EasyMock.createMock(AccumuloConfiguration.class);
ServerContext context = EasyMock.createMock(ServerContext.class);
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -170,6 +179,8 @@ public class GarbageCollectWriteAheadLogsTest {
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+ EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
tserverSet.scanServers();
EasyMock.expectLastCall();
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -195,7 +206,7 @@ public class GarbageCollectWriteAheadLogsTest {
EasyMock.expectLastCall().once();
marker.forget(server2);
EasyMock.expectLastCall().once();
- EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
+ EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false,
marker,
tabletOnServer1List) {
@Override
@@ -204,11 +215,12 @@ public class GarbageCollectWriteAheadLogsTest {
}
};
gc.collect(status);
- EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
+ EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
}
@Test
public void ignoreReferenceLogOnDeadServer() throws Exception {
+ AccumuloConfiguration conf =
EasyMock.createMock(AccumuloConfiguration.class);
ServerContext context = EasyMock.createMock(ServerContext.class);
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -218,6 +230,8 @@ public class GarbageCollectWriteAheadLogsTest {
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+ EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
tserverSet.scanServers();
EasyMock.expectLastCall();
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -238,7 +252,7 @@ public class GarbageCollectWriteAheadLogsTest {
mscanner.setRange(ReplicationSection.getRange());
EasyMock.expectLastCall().once();
EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
- EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
+ EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false,
marker,
tabletOnServer2List) {
@Override
@@ -247,11 +261,12 @@ public class GarbageCollectWriteAheadLogsTest {
}
};
gc.collect(status);
- EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
+ EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
}
@Test
public void replicationDelaysFileCollection() throws Exception {
+ AccumuloConfiguration conf =
EasyMock.createMock(AccumuloConfiguration.class);
ServerContext context = EasyMock.createMock(ServerContext.class);
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -266,6 +281,8 @@ public class GarbageCollectWriteAheadLogsTest {
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+ EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
tserverSet.scanServers();
EasyMock.expectLastCall();
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -286,7 +303,7 @@ public class GarbageCollectWriteAheadLogsTest {
mscanner.setRange(ReplicationSection.getRange());
EasyMock.expectLastCall().once();
EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
- EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
+ EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false,
marker,
tabletOnServer1List) {
@Override
@@ -295,6 +312,6 @@ public class GarbageCollectWriteAheadLogsTest {
}
};
gc.collect(status);
- EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
+ EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
}
}