This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new a089f86380 Replace iterator with stream to close resources in GarbageCollectWriteAheadLogs (#4207) a089f86380 is described below commit a089f86380e0283a0be21c5cb56b4b67c25627c6 Author: Dom G <domgargu...@apache.org> AuthorDate: Wed Feb 7 08:25:42 2024 -0500 Replace iterator with stream to close resources in GarbageCollectWriteAheadLogs (#4207) * make sure collect method can only be called once per object * make sure the underlying resources are closed after they are needed --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../accumulo/gc/GarbageCollectWriteAheadLogs.java | 97 ++++++++++--------- .../gc/GarbageCollectWriteAheadLogsTest.java | 106 ++++++++++++++------- 2 files changed, 126 insertions(+), 77 deletions(-) diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index c4403365ca..161e6c5754 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -34,6 +34,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; @@ -42,6 +44,7 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Pair; @@ -57,7 +60,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterators; +import com.google.common.base.Preconditions; +import com.google.common.collect.Streams; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -69,7 +73,7 @@ public class GarbageCollectWriteAheadLogs { private final VolumeManager fs; private final LiveTServerSet liveServers; private final WalStateManager walMarker; - private final Iterable<TabletMetadata> store; + private final AtomicBoolean hasCollected; /** * Creates a new GC WAL object. @@ -82,34 +86,32 @@ public class GarbageCollectWriteAheadLogs { this.context = context; this.fs = fs; this.liveServers = liveServers; - this.walMarker = new WalStateManager(context); - this.store = () -> Iterators.concat( - context.getAmple().readTablets().forLevel(DataLevel.ROOT).filter(new HasWalsFilter()) - .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build().iterator(), - context.getAmple().readTablets().forLevel(DataLevel.METADATA).filter(new HasWalsFilter()) - .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build().iterator(), - context.getAmple().readTablets().forLevel(DataLevel.USER).filter(new HasWalsFilter()) - .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build().iterator()); + this.walMarker = createWalStateManager(context); + this.hasCollected = new AtomicBoolean(false); } - /** - * Creates a new GC WAL object. Meant for testing -- allows mocked objects. - * - * @param context the collection server's context - * @param fs volume manager to use - * @param liveTServerSet a started LiveTServerSet instance - */ @VisibleForTesting - GarbageCollectWriteAheadLogs(ServerContext context, VolumeManager fs, - LiveTServerSet liveTServerSet, WalStateManager walMarker, Iterable<TabletMetadata> store) { - this.context = context; - this.fs = fs; - this.liveServers = liveTServerSet; - this.walMarker = walMarker; - this.store = store; + WalStateManager createWalStateManager(ServerContext context) { + return new WalStateManager(context); + } + + @VisibleForTesting + Stream<TabletMetadata> createStore() { + TabletsMetadata root = context.getAmple().readTablets().forLevel(DataLevel.ROOT) + .filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build(); + TabletsMetadata metadata = context.getAmple().readTablets().forLevel(DataLevel.METADATA) + .filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build(); + TabletsMetadata user = context.getAmple().readTablets().forLevel(DataLevel.USER) + .filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build(); + return Streams.concat(root.stream(), metadata.stream(), user.stream()).onClose(() -> { + root.close(); + metadata.close(); + user.close(); + }); } public void collect(GCStatus status) { + Preconditions.checkState(!hasCollected.get(), "Can only call collect once per object"); try { long count; long fileScanStop; @@ -197,7 +199,7 @@ public class GarbageCollectWriteAheadLogs { } finally { span5.end(); } - + hasCollected.set(true); } catch (Exception e) { log.error("exception occurred while garbage collecting write ahead logs", e); } finally { @@ -279,28 +281,32 @@ public class GarbageCollectWriteAheadLogs { } // remove any entries if there's a log reference (recovery hasn't finished) - for (TabletMetadata tabletMetadata : store) { - // Tablet is still assigned to a dead server. Manager has moved markers and reassigned it - // Easiest to just ignore all the WALs for the dead server. - if (TabletState.compute(tabletMetadata, liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) { - Set<UUID> idsToIgnore = candidates.remove(tabletMetadata.getLocation().getServerInstance()); - if (idsToIgnore != null) { - result.keySet().removeAll(idsToIgnore); - recoveryLogs.keySet().removeAll(idsToIgnore); + try (Stream<TabletMetadata> store = createStore()) { + store.forEach(tabletMetadata -> { + // Tablet is still assigned to a dead server. Manager has moved markers and reassigned it + // Easiest to just ignore all the WALs for the dead server. + if (TabletState.compute(tabletMetadata, liveServers) + == TabletState.ASSIGNED_TO_DEAD_SERVER) { + Set<UUID> idsToIgnore = + candidates.remove(tabletMetadata.getLocation().getServerInstance()); + if (idsToIgnore != null) { + result.keySet().removeAll(idsToIgnore); + recoveryLogs.keySet().removeAll(idsToIgnore); + } } - } - // Tablet is being recovered and has WAL references, remove all the WALs for the dead server - // that made the WALs. - for (LogEntry wal : tabletMetadata.getLogs()) { - UUID walUUID = wal.getUniqueID(); - TServerInstance dead = result.get(walUUID); - // There's a reference to a log file, so skip that server's logs - Set<UUID> idsToIgnore = candidates.remove(dead); - if (idsToIgnore != null) { - result.keySet().removeAll(idsToIgnore); - recoveryLogs.keySet().removeAll(idsToIgnore); + // Tablet is being recovered and has WAL references, remove all the WALs for the dead server + // that made the WALs. + for (LogEntry wal : tabletMetadata.getLogs()) { + UUID walUUID = wal.getUniqueID(); + TServerInstance dead = result.get(walUUID); + // There's a reference to a log file, so skip that server's logs + Set<UUID> idsToIgnore = candidates.remove(dead); + if (idsToIgnore != null) { + result.keySet().removeAll(idsToIgnore); + recoveryLogs.keySet().removeAll(idsToIgnore); + } } - } + }); } // Remove OPEN and CLOSED logs for live servers: they are still in use @@ -369,4 +375,5 @@ public class GarbageCollectWriteAheadLogs { } return result; } + } diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 53ab7bae1d..abfd09430d 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -21,11 +21,13 @@ package org.apache.accumulo.gc; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Stream; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -74,10 +76,8 @@ public class GarbageCollectWriteAheadLogsTest { } } - private final Iterable<TabletMetadata> tabletOnServer1List = - Collections.singletonList(tabletAssignedToServer1); - private final Iterable<TabletMetadata> tabletOnServer2List = - Collections.singletonList(tabletAssignedToServer2); + private final Stream<TabletMetadata> tabletOnServer1List = Stream.of(tabletAssignedToServer1); + private final Stream<TabletMetadata> tabletOnServer2List = Stream.of(tabletAssignedToServer2); @Test public void testRemoveUnusedLog() throws Exception { @@ -99,14 +99,26 @@ public class GarbageCollectWriteAheadLogsTest { marker.removeWalMarker(server1, id); EasyMock.expectLastCall().once(); EasyMock.replay(context, fs, marker, tserverSet); - GarbageCollectWriteAheadLogs gc = - new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer1List) { - @Override - protected Map<UUID,Path> getSortedWALogs() { - return Collections.emptyMap(); - } - }; + GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) { + @Override + protected Map<UUID,Path> getSortedWALogs() { + return Collections.emptyMap(); + } + + @Override + WalStateManager createWalStateManager(ServerContext serverContext) { + return marker; + } + + @Override + Stream<TabletMetadata> createStore() { + return tabletOnServer1List; + } + }; gc.collect(status); + assertThrows(IllegalStateException.class, () -> gc.collect(status), + "Should only be able to call collect once"); + EasyMock.verify(context, fs, marker, tserverSet); } @@ -126,14 +138,24 @@ public class GarbageCollectWriteAheadLogsTest { EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once(); EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.CLOSED, path)); EasyMock.replay(context, marker, tserverSet, fs); - GarbageCollectWriteAheadLogs gc = - new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer1List) { - @Override - protected Map<UUID,Path> getSortedWALogs() { - return Collections.emptyMap(); - } - }; + GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) { + @Override + protected Map<UUID,Path> getSortedWALogs() { + return Collections.emptyMap(); + } + + @Override + WalStateManager createWalStateManager(ServerContext serverContext) { + return marker; + } + + @Override + Stream<TabletMetadata> createStore() { + return tabletOnServer1List; + } + }; gc.collect(status); + EasyMock.verify(context, marker, tserverSet, fs); } @@ -160,14 +182,24 @@ public class GarbageCollectWriteAheadLogsTest { marker.forget(server2); EasyMock.expectLastCall().once(); EasyMock.replay(context, fs, marker, tserverSet); - GarbageCollectWriteAheadLogs gc = - new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer1List) { - @Override - protected Map<UUID,Path> getSortedWALogs() { - return Collections.emptyMap(); - } - }; + GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) { + @Override + protected Map<UUID,Path> getSortedWALogs() { + return Collections.emptyMap(); + } + + @Override + WalStateManager createWalStateManager(ServerContext serverContext) { + return marker; + } + + @Override + Stream<TabletMetadata> createStore() { + return tabletOnServer1List; + } + }; gc.collect(status); + EasyMock.verify(context, fs, marker, tserverSet); } @@ -188,14 +220,24 @@ public class GarbageCollectWriteAheadLogsTest { EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path)); EasyMock.replay(context, fs, marker, tserverSet); - GarbageCollectWriteAheadLogs gc = - new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer2List) { - @Override - protected Map<UUID,Path> getSortedWALogs() { - return Collections.emptyMap(); - } - }; + GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) { + @Override + protected Map<UUID,Path> getSortedWALogs() { + return Collections.emptyMap(); + } + + @Override + WalStateManager createWalStateManager(ServerContext serverContext) { + return marker; + } + + @Override + Stream<TabletMetadata> createStore() { + return tabletOnServer2List; + } + }; gc.collect(status); + EasyMock.verify(context, fs, marker, tserverSet); }