This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit f5396dfa85b3e127c132a549cd98c53dfaca6b05 Merge: 00cb09f957 eda39bb1c1 Author: Dom Garguilo <domgargu...@apache.org> AuthorDate: Fri Aug 16 17:19:54 2024 -0400 Merge remote-tracking branch 'upstream/main' into elasticity .../accumulo/gc/GarbageCollectWriteAheadLogs.java | 28 +++++++----- .../gc/GarbageCollectWriteAheadLogsTest.java | 51 +++++++--------------- 2 files changed, 34 insertions(+), 45 deletions(-) diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 79cbe9535a,c636c34c4b..4d2f1b4dc6 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@@ -72,7 -65,8 +72,7 @@@ public class GarbageCollectWriteAheadLo private final VolumeManager fs; private final LiveTServerSet liveServers; private final WalStateManager walMarker; - private final AtomicBoolean hasCollected; + private final AtomicBoolean hasCollected = new AtomicBoolean(false); - private final Stream<TabletLocationState> store; /** * Creates a new GC WAL object. @@@ -82,26 -76,44 +82,34 @@@ */ GarbageCollectWriteAheadLogs(final ServerContext context, final VolumeManager fs, final LiveTServerSet liveServers) { - this(context, fs, liveServers, new WalStateManager(context), createStore(context)); ++ this(context, fs, liveServers, new WalStateManager(context)); + } + + /** + * Creates a new GC WAL object. Meant for testing -- allows for mocked objects. + * + * + * @param context the collection server's context + * @param fs volume manager to use + * @param liveServers a started LiveTServerSet instance + * @param walMarker a WalStateManager instance - * @param store a stream of TabletLocationState objects + */ + GarbageCollectWriteAheadLogs(final ServerContext context, final VolumeManager fs, - final LiveTServerSet liveServers, final WalStateManager walMarker, - final Stream<TabletLocationState> store) { ++ final LiveTServerSet liveServers, final WalStateManager walMarker) { this.context = context; this.fs = fs; this.liveServers = liveServers; - this.walMarker = createWalStateManager(context); - this.hasCollected = new AtomicBoolean(false); - } - - @VisibleForTesting - WalStateManager createWalStateManager(ServerContext context) { - return new WalStateManager(context); + this.walMarker = walMarker; - this.store = store; } - private static Stream<TabletLocationState> createStore(final ServerContext context) { - var rootStream = TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).stream(); - var metadataStream = TabletStateStore.getStoreForLevel(DataLevel.METADATA, context).stream(); - var userStream = TabletStateStore.getStoreForLevel(DataLevel.USER, context).stream(); - return Streams.concat(rootStream, metadataStream, userStream).onClose(() -> { - try { - rootStream.close(); - } finally { - try { - metadataStream.close(); - } finally { - userStream.close(); - } - } - }); + @VisibleForTesting + Stream<TabletMetadata> createStore(Set<TServerInstance> liveTServers) { + GcWalsFilter walsFilter = new GcWalsFilter(liveTServers); + + return Stream.of(DataLevel.ROOT, DataLevel.METADATA, DataLevel.USER) + .map(dataLevel -> context.getAmple().readTablets().forLevel(dataLevel).filter(walsFilter) + .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build()) + .flatMap(TabletsMetadata::stream); } public void collect(GCStatus status) { diff --cc server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 7f4ec5173c,13c264a812..f868fd26d1 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@@ -100,26 -94,14 +100,21 @@@ public class GarbageCollectWriteAheadLo marker.removeWalMarker(server1, id); EasyMock.expectLastCall().once(); EasyMock.replay(context, fs, marker, tserverSet); - GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) { - var gc = - new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer1List) { - @Override - protected Map<UUID,Path> getSortedWALogs() { - return Collections.emptyMap(); - } - }; ++ var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) { + @Override + protected Map<UUID,Path> getSortedWALogs() { + return Collections.emptyMap(); + } + - @Override - WalStateManager createWalStateManager(ServerContext serverContext) { - return marker; - } - + @Override + Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) { + return tabletOnServer1List; + } + }; gc.collect(status); + assertThrows(IllegalStateException.class, () -> gc.collect(status), + "Should only be able to call collect once"); + EasyMock.verify(context, fs, marker, tserverSet); } @@@ -139,24 -121,14 +134,19 @@@ EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once(); EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.CLOSED, path)); EasyMock.replay(context, marker, tserverSet, fs); - GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) { - var gc = - new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer1List) { - @Override - protected Map<UUID,Path> getSortedWALogs() { - return Collections.emptyMap(); - } - }; ++ var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) { + @Override + protected Map<UUID,Path> getSortedWALogs() { + return Collections.emptyMap(); + } + - @Override - WalStateManager createWalStateManager(ServerContext serverContext) { - return marker; - } - + @Override + Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) { + return tabletOnServer1List; + } + }; gc.collect(status); + EasyMock.verify(context, marker, tserverSet, fs); } @@@ -183,24 -155,14 +173,19 @@@ marker.forget(server2); EasyMock.expectLastCall().once(); EasyMock.replay(context, fs, marker, tserverSet); - GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) { - var gc = - new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer1List) { - @Override - protected Map<UUID,Path> getSortedWALogs() { - return Collections.emptyMap(); - } - }; ++ var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) { + @Override + protected Map<UUID,Path> getSortedWALogs() { + return Collections.emptyMap(); + } + - @Override - WalStateManager createWalStateManager(ServerContext serverContext) { - return marker; - } - + @Override + Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) { + return tabletOnServer1List; + } + }; gc.collect(status); + EasyMock.verify(context, fs, marker, tserverSet); } @@@ -221,24 -183,14 +206,20 @@@ EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path)); EasyMock.replay(context, fs, marker, tserverSet); - GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) { - @Override - protected Map<UUID,Path> getSortedWALogs() { - return Collections.emptyMap(); - } - - @Override - WalStateManager createWalStateManager(ServerContext serverContext) { - return marker; - } - - @Override - Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) { - return tabletOnServer2List; - } - }; - var gc = - new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer2List) { ++ GarbageCollectWriteAheadLogs gc = ++ new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) { + @Override + protected Map<UUID,Path> getSortedWALogs() { + return Collections.emptyMap(); + } ++ ++ @Override ++ Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) { ++ return tabletOnServer2List; ++ } + }; gc.collect(status); + EasyMock.verify(context, fs, marker, tserverSet); }