This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new fd111a22a6 Ensure resources are closed in GarbageCollectWriteAheadLogs 
(#4790)
fd111a22a6 is described below

commit fd111a22a6454837ad6076d5ba6fe54fc8934be5
Author: Dom G. <domgargu...@apache.org>
AuthorDate: Fri Aug 16 15:48:58 2024 -0400

    Ensure resources are closed in GarbageCollectWriteAheadLogs (#4790)
    
    * Ensure resources are closed in GarbageCollectWriteAheadLogs
    * Adds Stream<TabletLocationState> stream() method to TabletStateStore that 
closes the underlying iterator when the stream is closed
    
    ---------
    
    Co-authored-by: Christopher Tubbs <ctubb...@apache.org>
---
 .../server/manager/state/TabletStateStore.java     | 22 +++++
 .../accumulo/gc/GarbageCollectWriteAheadLogs.java  | 97 +++++++++++++---------
 .../gc/GarbageCollectWriteAheadLogsTest.java       | 34 ++++----
 3 files changed, 97 insertions(+), 56 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
index 05072fd1b0..1894f7e7c7 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
@@ -18,10 +18,16 @@
  */
 package org.apache.accumulo.server.manager.state;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -52,6 +58,22 @@ public interface TabletStateStore extends 
Iterable<TabletLocationState> {
   @Override
   ClosableIterator<TabletLocationState> iterator();
 
+  /**
+   * Create a stream of TabletLocationState that automatically closes the 
underlying iterator.
+   */
+  default Stream<TabletLocationState> stream() {
+    ClosableIterator<TabletLocationState> iterator = this.iterator();
+    return StreamSupport
+        .stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+        .onClose(() -> {
+          try {
+            iterator.close();
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+  }
+
   /**
    * Store the assigned locations in the data store.
    */
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index ecfaeaba66..0fe4843ad1 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -57,8 +59,8 @@ import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterators;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Streams;
 
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Scope;
@@ -71,7 +73,8 @@ public class GarbageCollectWriteAheadLogs {
   private final boolean useTrash;
   private final LiveTServerSet liveServers;
   private final WalStateManager walMarker;
-  private final Iterable<TabletLocationState> store;
+  private final AtomicBoolean hasCollected = new AtomicBoolean(false);
+  private final Stream<TabletLocationState> store;
 
   /**
    * Creates a new GC WAL object.
@@ -82,38 +85,51 @@ public class GarbageCollectWriteAheadLogs {
    */
   GarbageCollectWriteAheadLogs(final ServerContext context, final 
VolumeManager fs,
       final LiveTServerSet liveServers, boolean useTrash) {
-    this.context = context;
-    this.fs = fs;
-    this.useTrash = useTrash;
-    this.liveServers = liveServers;
-    this.walMarker = new WalStateManager(context);
-    this.store = () -> Iterators.concat(
-        TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).iterator(),
-        TabletStateStore.getStoreForLevel(DataLevel.METADATA, 
context).iterator(),
-        TabletStateStore.getStoreForLevel(DataLevel.USER, context).iterator());
+    this(context, fs, liveServers, useTrash, new WalStateManager(context), 
createStore(context));
   }
 
   /**
-   * Creates a new GC WAL object. Meant for testing -- allows mocked objects.
+   * Creates a new GC WAL object. Meant for testing -- allows for mocked 
objects.
+   *
    *
    * @param context the collection server's context
    * @param fs volume manager to use
+   * @param liveServers a started LiveTServerSet instance
    * @param useTrash true to move files to trash rather than delete them
-   * @param liveTServerSet a started LiveTServerSet instance
+   * @param walMarker a WalStateManager instance
+   * @param store a stream of TabletLocationState objects
    */
-  @VisibleForTesting
-  GarbageCollectWriteAheadLogs(ServerContext context, VolumeManager fs, 
boolean useTrash,
-      LiveTServerSet liveTServerSet, WalStateManager walMarker,
-      Iterable<TabletLocationState> store) {
+  GarbageCollectWriteAheadLogs(final ServerContext context, final 
VolumeManager fs,
+      final LiveTServerSet liveServers, boolean useTrash, final 
WalStateManager walMarker,
+      final Stream<TabletLocationState> store) {
     this.context = context;
     this.fs = fs;
     this.useTrash = useTrash;
-    this.liveServers = liveTServerSet;
+    this.liveServers = liveServers;
     this.walMarker = walMarker;
     this.store = store;
   }
 
+  private static Stream<TabletLocationState> createStore(final ServerContext 
context) {
+    var rootStream = TabletStateStore.getStoreForLevel(DataLevel.ROOT, 
context).stream();
+    var metadataStream = TabletStateStore.getStoreForLevel(DataLevel.METADATA, 
context).stream();
+    var userStream = TabletStateStore.getStoreForLevel(DataLevel.USER, 
context).stream();
+    return Streams.concat(rootStream, metadataStream, userStream).onClose(() 
-> {
+      try {
+        rootStream.close();
+      } finally {
+        try {
+          metadataStream.close();
+        } finally {
+          userStream.close();
+        }
+      }
+    });
+  }
+
   public void collect(GCStatus status) {
+    Preconditions.checkState(hasCollected.compareAndSet(false, true),
+        "collect() has already been called on this object (which should only 
be called once)");
     try {
       long count;
       long fileScanStop;
@@ -216,7 +232,6 @@ public class GarbageCollectWriteAheadLogs {
       } finally {
         span5.end();
       }
-
     } catch (Exception e) {
       log.error("exception occurred while garbage collecting write ahead 
logs", e);
     } finally {
@@ -302,30 +317,34 @@ public class GarbageCollectWriteAheadLogs {
     }
 
     // remove any entries if there's a log reference (recovery hasn't finished)
-    for (TabletLocationState state : store) {
-      // Tablet is still assigned to a dead server. Manager has moved markers 
and reassigned it
-      // Easiest to just ignore all the WALs for the dead server.
-      if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
-        Set<UUID> idsToIgnore = 
candidates.remove(state.current.getServerInstance());
-        if (idsToIgnore != null) {
-          result.keySet().removeAll(idsToIgnore);
-          recoveryLogs.keySet().removeAll(idsToIgnore);
-        }
-      }
-      // Tablet is being recovered and has WAL references, remove all the WALs 
for the dead server
-      // that made the WALs.
-      for (Collection<String> wals : state.walogs) {
-        for (String wal : wals) {
-          UUID walUUID = path2uuid(new Path(wal));
-          TServerInstance dead = result.get(walUUID);
-          // There's a reference to a log file, so skip that server's logs
-          Set<UUID> idsToIgnore = candidates.remove(dead);
+    try {
+      store.forEach(state -> {
+        // Tablet is still assigned to a dead server. Manager has moved 
markers and reassigned it
+        // Easiest to just ignore all the WALs for the dead server.
+        if (state.getState(liveServers) == 
TabletState.ASSIGNED_TO_DEAD_SERVER) {
+          Set<UUID> idsToIgnore = 
candidates.remove(state.current.getServerInstance());
           if (idsToIgnore != null) {
             result.keySet().removeAll(idsToIgnore);
             recoveryLogs.keySet().removeAll(idsToIgnore);
           }
         }
-      }
+        // Tablet is being recovered and has WAL references, remove all the 
WALs for the dead server
+        // that made the WALs.
+        for (Collection<String> wals : state.walogs) {
+          for (String wal : wals) {
+            UUID walUUID = path2uuid(new Path(wal));
+            TServerInstance dead = result.get(walUUID);
+            // There's a reference to a log file, so skip that server's logs
+            Set<UUID> idsToIgnore = candidates.remove(dead);
+            if (idsToIgnore != null) {
+              result.keySet().removeAll(idsToIgnore);
+              recoveryLogs.keySet().removeAll(idsToIgnore);
+            }
+          }
+        }
+      });
+    } finally {
+      store.close();
     }
 
     // Remove OPEN and CLOSED logs for live servers: they are still in use
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index d9c69da11f..544407b495 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.Key;
@@ -83,12 +84,11 @@ public class GarbageCollectWriteAheadLogsTest {
     }
   }
 
-  private final Iterable<TabletLocationState> tabletOnServer1List =
-      Collections.singletonList(tabletAssignedToServer1);
-  private final Iterable<TabletLocationState> tabletOnServer2List =
-      Collections.singletonList(tabletAssignedToServer2);
-  private final List<Entry<Key,Value>> emptyList = Collections.emptyList();
-  private final Iterator<Entry<Key,Value>> emptyKV = emptyList.iterator();
+  private final Stream<TabletLocationState> tabletOnServer1List =
+      Stream.of(tabletAssignedToServer1);
+  private final Stream<TabletLocationState> tabletOnServer2List =
+      Stream.of(tabletAssignedToServer2);
+  private final Iterator<Entry<Key,Value>> emptyKV = 
Collections.emptyIterator();
 
   @Test
   public void testRemoveUnusedLog() throws Exception {
@@ -109,8 +109,8 @@ public class GarbageCollectWriteAheadLogsTest {
     marker.removeWalMarker(server1, id);
     EasyMock.expectLastCall().once();
     EasyMock.replay(context, fs, marker, tserverSet);
-    GarbageCollectWriteAheadLogs gc = new 
GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer1List) {
+    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
+        tabletOnServer1List) {
       @Override
       @Deprecated
       protected int removeReplicationEntries(Map<UUID,TServerInstance> 
candidates) {
@@ -142,8 +142,8 @@ public class GarbageCollectWriteAheadLogsTest {
     EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
     EasyMock.expect(marker.state(server1, id)).andReturn(new 
Pair<>(WalState.CLOSED, path));
     EasyMock.replay(context, marker, tserverSet, fs);
-    GarbageCollectWriteAheadLogs gc = new 
GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer1List) {
+    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
+        tabletOnServer1List) {
       @Override
       @Deprecated
       protected int removeReplicationEntries(Map<UUID,TServerInstance> 
candidates) {
@@ -160,7 +160,7 @@ public class GarbageCollectWriteAheadLogsTest {
   }
 
   @Test
-  public void deleteUnreferenceLogOnDeadServer() throws Exception {
+  public void deleteUnreferencedLogOnDeadServer() throws Exception {
     ServerContext context = EasyMock.createMock(ServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
     WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -196,8 +196,8 @@ public class GarbageCollectWriteAheadLogsTest {
     marker.forget(server2);
     EasyMock.expectLastCall().once();
     EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
-    GarbageCollectWriteAheadLogs gc = new 
GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer1List) {
+    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
+        tabletOnServer1List) {
       @Override
       protected Map<UUID,Path> getSortedWALogs() {
         return Collections.emptyMap();
@@ -239,8 +239,8 @@ public class GarbageCollectWriteAheadLogsTest {
     EasyMock.expectLastCall().once();
     EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
     EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
-    GarbageCollectWriteAheadLogs gc = new 
GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer2List) {
+    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
+        tabletOnServer2List) {
       @Override
       protected Map<UUID,Path> getSortedWALogs() {
         return Collections.emptyMap();
@@ -287,8 +287,8 @@ public class GarbageCollectWriteAheadLogsTest {
     EasyMock.expectLastCall().once();
     
EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
     EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
-    GarbageCollectWriteAheadLogs gc = new 
GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer1List) {
+    var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, 
marker,
+        tabletOnServer1List) {
       @Override
       protected Map<UUID,Path> getSortedWALogs() {
         return Collections.emptyMap();

Reply via email to