This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new a089f86380 Replace iterator with stream to close resources in 
GarbageCollectWriteAheadLogs (#4207)
a089f86380 is described below

commit a089f86380e0283a0be21c5cb56b4b67c25627c6
Author: Dom G <domgargu...@apache.org>
AuthorDate: Wed Feb 7 08:25:42 2024 -0500

    Replace iterator with stream to close resources in 
GarbageCollectWriteAheadLogs (#4207)
    
    * make sure collect method can only be called once per object
    * make sure the underlying resources are closed after they are needed
    
    ---------
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../accumulo/gc/GarbageCollectWriteAheadLogs.java  |  97 ++++++++++---------
 .../gc/GarbageCollectWriteAheadLogsTest.java       | 106 ++++++++++++++-------
 2 files changed, 126 insertions(+), 77 deletions(-)

diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index c4403365ca..161e6c5754 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -34,6 +34,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
@@ -42,6 +44,7 @@ import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletState;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Pair;
@@ -57,7 +60,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterators;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Streams;
 
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Scope;
@@ -69,7 +73,7 @@ public class GarbageCollectWriteAheadLogs {
   private final VolumeManager fs;
   private final LiveTServerSet liveServers;
   private final WalStateManager walMarker;
-  private final Iterable<TabletMetadata> store;
+  private final AtomicBoolean hasCollected;
 
   /**
    * Creates a new GC WAL object.
@@ -82,34 +86,32 @@ public class GarbageCollectWriteAheadLogs {
     this.context = context;
     this.fs = fs;
     this.liveServers = liveServers;
-    this.walMarker = new WalStateManager(context);
-    this.store = () -> Iterators.concat(
-        context.getAmple().readTablets().forLevel(DataLevel.ROOT).filter(new 
HasWalsFilter())
-            .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build().iterator(),
-        
context.getAmple().readTablets().forLevel(DataLevel.METADATA).filter(new 
HasWalsFilter())
-            .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build().iterator(),
-        context.getAmple().readTablets().forLevel(DataLevel.USER).filter(new 
HasWalsFilter())
-            .fetch(LOCATION, LAST, LOGS, PREV_ROW, 
SUSPEND).build().iterator());
+    this.walMarker = createWalStateManager(context);
+    this.hasCollected = new AtomicBoolean(false);
   }
 
-  /**
-   * Creates a new GC WAL object. Meant for testing -- allows mocked objects.
-   *
-   * @param context the collection server's context
-   * @param fs volume manager to use
-   * @param liveTServerSet a started LiveTServerSet instance
-   */
   @VisibleForTesting
-  GarbageCollectWriteAheadLogs(ServerContext context, VolumeManager fs,
-      LiveTServerSet liveTServerSet, WalStateManager walMarker, 
Iterable<TabletMetadata> store) {
-    this.context = context;
-    this.fs = fs;
-    this.liveServers = liveTServerSet;
-    this.walMarker = walMarker;
-    this.store = store;
+  WalStateManager createWalStateManager(ServerContext context) {
+    return new WalStateManager(context);
+  }
+
+  @VisibleForTesting
+  Stream<TabletMetadata> createStore() {
+    TabletsMetadata root = 
context.getAmple().readTablets().forLevel(DataLevel.ROOT)
+        .filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, 
SUSPEND).build();
+    TabletsMetadata metadata = 
context.getAmple().readTablets().forLevel(DataLevel.METADATA)
+        .filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, 
SUSPEND).build();
+    TabletsMetadata user = 
context.getAmple().readTablets().forLevel(DataLevel.USER)
+        .filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, 
SUSPEND).build();
+    return Streams.concat(root.stream(), metadata.stream(), 
user.stream()).onClose(() -> {
+      root.close();
+      metadata.close();
+      user.close();
+    });
   }
 
   public void collect(GCStatus status) {
+    Preconditions.checkState(!hasCollected.get(), "Can only call collect once 
per object");
     try {
       long count;
       long fileScanStop;
@@ -197,7 +199,7 @@ public class GarbageCollectWriteAheadLogs {
       } finally {
         span5.end();
       }
-
+      hasCollected.set(true);
     } catch (Exception e) {
       log.error("exception occurred while garbage collecting write ahead 
logs", e);
     } finally {
@@ -279,28 +281,32 @@ public class GarbageCollectWriteAheadLogs {
     }
 
     // remove any entries if there's a log reference (recovery hasn't finished)
-    for (TabletMetadata tabletMetadata : store) {
-      // Tablet is still assigned to a dead server. Manager has moved markers 
and reassigned it
-      // Easiest to just ignore all the WALs for the dead server.
-      if (TabletState.compute(tabletMetadata, liveServers) == 
TabletState.ASSIGNED_TO_DEAD_SERVER) {
-        Set<UUID> idsToIgnore = 
candidates.remove(tabletMetadata.getLocation().getServerInstance());
-        if (idsToIgnore != null) {
-          result.keySet().removeAll(idsToIgnore);
-          recoveryLogs.keySet().removeAll(idsToIgnore);
+    try (Stream<TabletMetadata> store = createStore()) {
+      store.forEach(tabletMetadata -> {
+        // Tablet is still assigned to a dead server. Manager has moved 
markers and reassigned it
+        // Easiest to just ignore all the WALs for the dead server.
+        if (TabletState.compute(tabletMetadata, liveServers)
+            == TabletState.ASSIGNED_TO_DEAD_SERVER) {
+          Set<UUID> idsToIgnore =
+              
candidates.remove(tabletMetadata.getLocation().getServerInstance());
+          if (idsToIgnore != null) {
+            result.keySet().removeAll(idsToIgnore);
+            recoveryLogs.keySet().removeAll(idsToIgnore);
+          }
         }
-      }
-      // Tablet is being recovered and has WAL references, remove all the WALs 
for the dead server
-      // that made the WALs.
-      for (LogEntry wal : tabletMetadata.getLogs()) {
-        UUID walUUID = wal.getUniqueID();
-        TServerInstance dead = result.get(walUUID);
-        // There's a reference to a log file, so skip that server's logs
-        Set<UUID> idsToIgnore = candidates.remove(dead);
-        if (idsToIgnore != null) {
-          result.keySet().removeAll(idsToIgnore);
-          recoveryLogs.keySet().removeAll(idsToIgnore);
+        // Tablet is being recovered and has WAL references, remove all the 
WALs for the dead server
+        // that made the WALs.
+        for (LogEntry wal : tabletMetadata.getLogs()) {
+          UUID walUUID = wal.getUniqueID();
+          TServerInstance dead = result.get(walUUID);
+          // There's a reference to a log file, so skip that server's logs
+          Set<UUID> idsToIgnore = candidates.remove(dead);
+          if (idsToIgnore != null) {
+            result.keySet().removeAll(idsToIgnore);
+            recoveryLogs.keySet().removeAll(idsToIgnore);
+          }
         }
-      }
+      });
     }
 
     // Remove OPEN and CLOSED logs for live servers: they are still in use
@@ -369,4 +375,5 @@ public class GarbageCollectWriteAheadLogs {
     }
     return result;
   }
+
 }
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 53ab7bae1d..abfd09430d 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -21,11 +21,13 @@ package org.apache.accumulo.gc;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.admin.TabletAvailability;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -74,10 +76,8 @@ public class GarbageCollectWriteAheadLogsTest {
     }
   }
 
-  private final Iterable<TabletMetadata> tabletOnServer1List =
-      Collections.singletonList(tabletAssignedToServer1);
-  private final Iterable<TabletMetadata> tabletOnServer2List =
-      Collections.singletonList(tabletAssignedToServer2);
+  private final Stream<TabletMetadata> tabletOnServer1List = 
Stream.of(tabletAssignedToServer1);
+  private final Stream<TabletMetadata> tabletOnServer2List = 
Stream.of(tabletAssignedToServer2);
 
   @Test
   public void testRemoveUnusedLog() throws Exception {
@@ -99,14 +99,26 @@ public class GarbageCollectWriteAheadLogsTest {
     marker.removeWalMarker(server1, id);
     EasyMock.expectLastCall().once();
     EasyMock.replay(context, fs, marker, tserverSet);
-    GarbageCollectWriteAheadLogs gc =
-        new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, 
tabletOnServer1List) {
-          @Override
-          protected Map<UUID,Path> getSortedWALogs() {
-            return Collections.emptyMap();
-          }
-        };
+    GarbageCollectWriteAheadLogs gc = new 
GarbageCollectWriteAheadLogs(context, fs, tserverSet) {
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() {
+        return Collections.emptyMap();
+      }
+
+      @Override
+      WalStateManager createWalStateManager(ServerContext serverContext) {
+        return marker;
+      }
+
+      @Override
+      Stream<TabletMetadata> createStore() {
+        return tabletOnServer1List;
+      }
+    };
     gc.collect(status);
+    assertThrows(IllegalStateException.class, () -> gc.collect(status),
+        "Should only be able to call collect once");
+
     EasyMock.verify(context, fs, marker, tserverSet);
   }
 
@@ -126,14 +138,24 @@ public class GarbageCollectWriteAheadLogsTest {
     EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
     EasyMock.expect(marker.state(server1, id)).andReturn(new 
Pair<>(WalState.CLOSED, path));
     EasyMock.replay(context, marker, tserverSet, fs);
-    GarbageCollectWriteAheadLogs gc =
-        new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, 
tabletOnServer1List) {
-          @Override
-          protected Map<UUID,Path> getSortedWALogs() {
-            return Collections.emptyMap();
-          }
-        };
+    GarbageCollectWriteAheadLogs gc = new 
GarbageCollectWriteAheadLogs(context, fs, tserverSet) {
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() {
+        return Collections.emptyMap();
+      }
+
+      @Override
+      WalStateManager createWalStateManager(ServerContext serverContext) {
+        return marker;
+      }
+
+      @Override
+      Stream<TabletMetadata> createStore() {
+        return tabletOnServer1List;
+      }
+    };
     gc.collect(status);
+
     EasyMock.verify(context, marker, tserverSet, fs);
   }
 
@@ -160,14 +182,24 @@ public class GarbageCollectWriteAheadLogsTest {
     marker.forget(server2);
     EasyMock.expectLastCall().once();
     EasyMock.replay(context, fs, marker, tserverSet);
-    GarbageCollectWriteAheadLogs gc =
-        new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, 
tabletOnServer1List) {
-          @Override
-          protected Map<UUID,Path> getSortedWALogs() {
-            return Collections.emptyMap();
-          }
-        };
+    GarbageCollectWriteAheadLogs gc = new 
GarbageCollectWriteAheadLogs(context, fs, tserverSet) {
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() {
+        return Collections.emptyMap();
+      }
+
+      @Override
+      WalStateManager createWalStateManager(ServerContext serverContext) {
+        return marker;
+      }
+
+      @Override
+      Stream<TabletMetadata> createStore() {
+        return tabletOnServer1List;
+      }
+    };
     gc.collect(status);
+
     EasyMock.verify(context, fs, marker, tserverSet);
   }
 
@@ -188,14 +220,24 @@ public class GarbageCollectWriteAheadLogsTest {
     EasyMock.expect(marker.state(server2, id)).andReturn(new 
Pair<>(WalState.OPEN, path));
 
     EasyMock.replay(context, fs, marker, tserverSet);
-    GarbageCollectWriteAheadLogs gc =
-        new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, 
tabletOnServer2List) {
-          @Override
-          protected Map<UUID,Path> getSortedWALogs() {
-            return Collections.emptyMap();
-          }
-        };
+    GarbageCollectWriteAheadLogs gc = new 
GarbageCollectWriteAheadLogs(context, fs, tserverSet) {
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() {
+        return Collections.emptyMap();
+      }
+
+      @Override
+      WalStateManager createWalStateManager(ServerContext serverContext) {
+        return marker;
+      }
+
+      @Override
+      Stream<TabletMetadata> createStore() {
+        return tabletOnServer2List;
+      }
+    };
     gc.collect(status);
+
     EasyMock.verify(context, fs, marker, tserverSet);
   }
 

Reply via email to