This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new fd111a22a6 Ensure resources are closed in GarbageCollectWriteAheadLogs (#4790) fd111a22a6 is described below commit fd111a22a6454837ad6076d5ba6fe54fc8934be5 Author: Dom G. <domgargu...@apache.org> AuthorDate: Fri Aug 16 15:48:58 2024 -0400 Ensure resources are closed in GarbageCollectWriteAheadLogs (#4790) * Ensure resources are closed in GarbageCollectWriteAheadLogs * Adds Stream<TabletLocationState> stream() method to TabletStateStore that closes the underlying iterator when the stream is closed --------- Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../server/manager/state/TabletStateStore.java | 22 +++++ .../accumulo/gc/GarbageCollectWriteAheadLogs.java | 97 +++++++++++++--------- .../gc/GarbageCollectWriteAheadLogsTest.java | 34 ++++---- 3 files changed, 97 insertions(+), 56 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java index 05072fd1b0..1894f7e7c7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java @@ -18,10 +18,16 @@ */ package org.apache.accumulo.server.manager.state; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -52,6 +58,22 @@ public interface TabletStateStore extends Iterable<TabletLocationState> { @Override ClosableIterator<TabletLocationState> iterator(); + /** + * Create a stream of TabletLocationState that automatically closes the underlying iterator. + */ + default Stream<TabletLocationState> stream() { + ClosableIterator<TabletLocationState> iterator = this.iterator(); + return StreamSupport + .stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .onClose(() -> { + try { + iterator.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + /** * Store the assigned locations in the data store. */ diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index ecfaeaba66..0fe4843ad1 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; @@ -57,8 +59,8 @@ import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterators; +import com.google.common.base.Preconditions; +import com.google.common.collect.Streams; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -71,7 +73,8 @@ public class GarbageCollectWriteAheadLogs { private final boolean useTrash; private final LiveTServerSet liveServers; private final WalStateManager walMarker; - private final Iterable<TabletLocationState> store; + private final AtomicBoolean hasCollected = new AtomicBoolean(false); + private final Stream<TabletLocationState> store; /** * Creates a new GC WAL object. @@ -82,38 +85,51 @@ public class GarbageCollectWriteAheadLogs { */ GarbageCollectWriteAheadLogs(final ServerContext context, final VolumeManager fs, final LiveTServerSet liveServers, boolean useTrash) { - this.context = context; - this.fs = fs; - this.useTrash = useTrash; - this.liveServers = liveServers; - this.walMarker = new WalStateManager(context); - this.store = () -> Iterators.concat( - TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).iterator(), - TabletStateStore.getStoreForLevel(DataLevel.METADATA, context).iterator(), - TabletStateStore.getStoreForLevel(DataLevel.USER, context).iterator()); + this(context, fs, liveServers, useTrash, new WalStateManager(context), createStore(context)); } /** - * Creates a new GC WAL object. Meant for testing -- allows mocked objects. + * Creates a new GC WAL object. Meant for testing -- allows for mocked objects. + * * * @param context the collection server's context * @param fs volume manager to use + * @param liveServers a started LiveTServerSet instance * @param useTrash true to move files to trash rather than delete them - * @param liveTServerSet a started LiveTServerSet instance + * @param walMarker a WalStateManager instance + * @param store a stream of TabletLocationState objects */ - @VisibleForTesting - GarbageCollectWriteAheadLogs(ServerContext context, VolumeManager fs, boolean useTrash, - LiveTServerSet liveTServerSet, WalStateManager walMarker, - Iterable<TabletLocationState> store) { + GarbageCollectWriteAheadLogs(final ServerContext context, final VolumeManager fs, + final LiveTServerSet liveServers, boolean useTrash, final WalStateManager walMarker, + final Stream<TabletLocationState> store) { this.context = context; this.fs = fs; this.useTrash = useTrash; - this.liveServers = liveTServerSet; + this.liveServers = liveServers; this.walMarker = walMarker; this.store = store; } + private static Stream<TabletLocationState> createStore(final ServerContext context) { + var rootStream = TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).stream(); + var metadataStream = TabletStateStore.getStoreForLevel(DataLevel.METADATA, context).stream(); + var userStream = TabletStateStore.getStoreForLevel(DataLevel.USER, context).stream(); + return Streams.concat(rootStream, metadataStream, userStream).onClose(() -> { + try { + rootStream.close(); + } finally { + try { + metadataStream.close(); + } finally { + userStream.close(); + } + } + }); + } + public void collect(GCStatus status) { + Preconditions.checkState(hasCollected.compareAndSet(false, true), + "collect() has already been called on this object (which should only be called once)"); try { long count; long fileScanStop; @@ -216,7 +232,6 @@ public class GarbageCollectWriteAheadLogs { } finally { span5.end(); } - } catch (Exception e) { log.error("exception occurred while garbage collecting write ahead logs", e); } finally { @@ -302,30 +317,34 @@ public class GarbageCollectWriteAheadLogs { } // remove any entries if there's a log reference (recovery hasn't finished) - for (TabletLocationState state : store) { - // Tablet is still assigned to a dead server. Manager has moved markers and reassigned it - // Easiest to just ignore all the WALs for the dead server. - if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) { - Set<UUID> idsToIgnore = candidates.remove(state.current.getServerInstance()); - if (idsToIgnore != null) { - result.keySet().removeAll(idsToIgnore); - recoveryLogs.keySet().removeAll(idsToIgnore); - } - } - // Tablet is being recovered and has WAL references, remove all the WALs for the dead server - // that made the WALs. - for (Collection<String> wals : state.walogs) { - for (String wal : wals) { - UUID walUUID = path2uuid(new Path(wal)); - TServerInstance dead = result.get(walUUID); - // There's a reference to a log file, so skip that server's logs - Set<UUID> idsToIgnore = candidates.remove(dead); + try { + store.forEach(state -> { + // Tablet is still assigned to a dead server. Manager has moved markers and reassigned it + // Easiest to just ignore all the WALs for the dead server. + if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) { + Set<UUID> idsToIgnore = candidates.remove(state.current.getServerInstance()); if (idsToIgnore != null) { result.keySet().removeAll(idsToIgnore); recoveryLogs.keySet().removeAll(idsToIgnore); } } - } + // Tablet is being recovered and has WAL references, remove all the WALs for the dead server + // that made the WALs. + for (Collection<String> wals : state.walogs) { + for (String wal : wals) { + UUID walUUID = path2uuid(new Path(wal)); + TServerInstance dead = result.get(walUUID); + // There's a reference to a log file, so skip that server's logs + Set<UUID> idsToIgnore = candidates.remove(dead); + if (idsToIgnore != null) { + result.keySet().removeAll(idsToIgnore); + recoveryLogs.keySet().removeAll(idsToIgnore); + } + } + } + }); + } finally { + store.close(); } // Remove OPEN and CLOSED logs for live servers: they are still in use diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index d9c69da11f..544407b495 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; +import java.util.stream.Stream; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; @@ -83,12 +84,11 @@ public class GarbageCollectWriteAheadLogsTest { } } - private final Iterable<TabletLocationState> tabletOnServer1List = - Collections.singletonList(tabletAssignedToServer1); - private final Iterable<TabletLocationState> tabletOnServer2List = - Collections.singletonList(tabletAssignedToServer2); - private final List<Entry<Key,Value>> emptyList = Collections.emptyList(); - private final Iterator<Entry<Key,Value>> emptyKV = emptyList.iterator(); + private final Stream<TabletLocationState> tabletOnServer1List = + Stream.of(tabletAssignedToServer1); + private final Stream<TabletLocationState> tabletOnServer2List = + Stream.of(tabletAssignedToServer2); + private final Iterator<Entry<Key,Value>> emptyKV = Collections.emptyIterator(); @Test public void testRemoveUnusedLog() throws Exception { @@ -109,8 +109,8 @@ public class GarbageCollectWriteAheadLogsTest { marker.removeWalMarker(server1, id); EasyMock.expectLastCall().once(); EasyMock.replay(context, fs, marker, tserverSet); - GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, - tserverSet, marker, tabletOnServer1List) { + var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, + tabletOnServer1List) { @Override @Deprecated protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates) { @@ -142,8 +142,8 @@ public class GarbageCollectWriteAheadLogsTest { EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once(); EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.CLOSED, path)); EasyMock.replay(context, marker, tserverSet, fs); - GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, - tserverSet, marker, tabletOnServer1List) { + var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, + tabletOnServer1List) { @Override @Deprecated protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates) { @@ -160,7 +160,7 @@ public class GarbageCollectWriteAheadLogsTest { } @Test - public void deleteUnreferenceLogOnDeadServer() throws Exception { + public void deleteUnreferencedLogOnDeadServer() throws Exception { ServerContext context = EasyMock.createMock(ServerContext.class); VolumeManager fs = EasyMock.createMock(VolumeManager.class); WalStateManager marker = EasyMock.createMock(WalStateManager.class); @@ -196,8 +196,8 @@ public class GarbageCollectWriteAheadLogsTest { marker.forget(server2); EasyMock.expectLastCall().once(); EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner); - GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, - tserverSet, marker, tabletOnServer1List) { + var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, + tabletOnServer1List) { @Override protected Map<UUID,Path> getSortedWALogs() { return Collections.emptyMap(); @@ -239,8 +239,8 @@ public class GarbageCollectWriteAheadLogsTest { EasyMock.expectLastCall().once(); EasyMock.expect(mscanner.iterator()).andReturn(emptyKV); EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner); - GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, - tserverSet, marker, tabletOnServer2List) { + var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, + tabletOnServer2List) { @Override protected Map<UUID,Path> getSortedWALogs() { return Collections.emptyMap(); @@ -287,8 +287,8 @@ public class GarbageCollectWriteAheadLogsTest { EasyMock.expectLastCall().once(); EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator()); EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner); - GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, - tserverSet, marker, tabletOnServer1List) { + var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker, + tabletOnServer1List) { @Override protected Map<UUID,Path> getSortedWALogs() { return Collections.emptyMap();