This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 16b5e3bb7b Create new internal GC class (#2670)
16b5e3bb7b is described below

commit 16b5e3bb7b3598896004f5648a9418f3e238d570
Author: Mike Miller <mmil...@apache.org>
AuthorDate: Tue May 3 08:40:33 2022 -0400

    Create new internal GC class (#2670)
    
    * Pull out the GCEnv inner class of SimpleGarbageCollector into its own
    class and call it GCRun.
    * Keep the logic of GCEnv but make the stats private numbers to allow
    dropping of synchronized block
    * Create separate logger classes for each instance of GCRun
    * Make GCRun return the stats gathered during that run
    * Make SimpleGarbageCollector increment the current stats based on what
    is returned in the GCRun class
---
 .../main/java/org/apache/accumulo/gc/GCRun.java    | 449 +++++++++++++++++++++
 .../apache/accumulo/gc/SimpleGarbageCollector.java | 382 ++----------------
 .../accumulo/gc/SimpleGarbageCollectorTest.java    |   6 +-
 3 files changed, 484 insertions(+), 353 deletions(-)

diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
new file mode 100644
index 0000000000..a4a80d99d4
--- /dev/null
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.gc;
+
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.manager.state.tables.TableState;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.TabletFileUtil;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeUtil;
+import org.apache.accumulo.server.gc.GcVolumeUtil;
+import org.apache.accumulo.server.replication.proto.Replication;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * A single garbage collection performed on a table (Root, MD) or all User 
tables.
+ */
+public class GCRun implements GarbageCollectionEnvironment {
+  private final Logger log;
+  private final Ample.DataLevel level;
+  private final ServerContext context;
+  private final AccumuloConfiguration config;
+  private long candidates = 0;
+  private long inUse = 0;
+  private long deleted = 0;
+  private long errors = 0;
+
+  GCRun(Ample.DataLevel level, ServerContext context) {
+    this.log = LoggerFactory.getLogger(level.name() + GCRun.class);
+    this.level = level;
+    this.context = context;
+    this.config = context.getConfiguration();
+  }
+
+  @Override
+  public Iterator<String> getCandidates() {
+    return context.getAmple().getGcCandidates(level);
+  }
+
+  @Override
+  public List<String> readCandidatesThatFitInMemory(Iterator<String> 
candidates) {
+    long candidateLength = 0;
+    // Converting the bytes to approximate number of characters for batch size.
+    long candidateBatchSize = getCandidateBatchSize() / 2;
+
+    List<String> candidatesBatch = new ArrayList<>();
+
+    while (candidates.hasNext()) {
+      String candidate = candidates.next();
+      candidateLength += candidate.length();
+      candidatesBatch.add(candidate);
+      if (candidateLength > candidateBatchSize) {
+        log.info("Candidate batch of size {} has exceeded the threshold. 
Attempting to delete "
+            + "what has been gathered so far.", candidateLength);
+        return candidatesBatch;
+      }
+    }
+    return candidatesBatch;
+  }
+
+  @Override
+  public Stream<String> getBlipPaths() throws TableNotFoundException {
+
+    if (level == Ample.DataLevel.ROOT) {
+      return Stream.empty();
+    }
+
+    int blipPrefixLen = MetadataSchema.BlipSection.getRowPrefix().length();
+    var scanner =
+        new IsolatedScanner(context.createScanner(level.metaTable(), 
Authorizations.EMPTY));
+    scanner.setRange(MetadataSchema.BlipSection.getRange());
+    return scanner.stream()
+        .map(entry -> 
entry.getKey().getRow().toString().substring(blipPrefixLen))
+        .onClose(scanner::close);
+  }
+
+  @Override
+  public Stream<Reference> getReferences() {
+    Stream<TabletMetadata> tabletStream;
+
+    if (level == Ample.DataLevel.ROOT) {
+      tabletStream = Stream.of(context.getAmple().readTablet(RootTable.EXTENT, 
DIR, FILES, SCANS));
+    } else {
+      tabletStream = 
TabletsMetadata.builder(context).scanTable(level.metaTable())
+          .checkConsistency().fetch(DIR, FILES, SCANS).build().stream();
+    }
+
+    return tabletStream.flatMap(tm -> {
+      Stream<Reference> refs = Stream.concat(tm.getFiles().stream(), 
tm.getScans().stream())
+          .map(f -> new Reference(tm.getTableId(), f.getMetaUpdateDelete(), 
false));
+      if (tm.getDirName() != null) {
+        refs =
+            Stream.concat(refs, Stream.of(new Reference(tm.getTableId(), 
tm.getDirName(), true)));
+      }
+      return refs;
+    });
+  }
+
+  @Override
+  public Set<TableId> getTableIDs() {
+    return context.getTableIdToNameMap().keySet();
+  }
+
+  @Override
+  public void delete(SortedMap<String,String> confirmedDeletes) throws 
TableNotFoundException {
+    final VolumeManager fs = context.getVolumeManager();
+    var metadataLocation = level == Ample.DataLevel.ROOT
+        ? context.getZooKeeperRoot() + " for " + RootTable.NAME : 
level.metaTable();
+
+    if (inSafeMode()) {
+      System.out.println("SAFEMODE: There are " + confirmedDeletes.size()
+          + " data file candidates marked for deletion in " + metadataLocation 
+ ".\n"
+          + "          Examine the log files to identify them.\n");
+      log.info("SAFEMODE: Listing all data file candidates for deletion");
+      for (String s : confirmedDeletes.values()) {
+        log.info("SAFEMODE: {}", s);
+      }
+      log.info("SAFEMODE: End candidates for deletion");
+      return;
+    }
+
+    List<String> processedDeletes = Collections.synchronizedList(new 
ArrayList<>());
+
+    minimizeDeletes(confirmedDeletes, processedDeletes, fs, log);
+
+    ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
+        .createExecutorService(config, Property.GC_DELETE_THREADS, false);
+
+    final List<Pair<Path,Path>> replacements = context.getVolumeReplacements();
+
+    for (final String delete : confirmedDeletes.values()) {
+
+      Runnable deleteTask = () -> {
+        boolean removeFlag = false;
+
+        try {
+          Path fullPath;
+          Path switchedDelete =
+              VolumeUtil.switchVolume(delete, VolumeManager.FileType.TABLE, 
replacements);
+          if (switchedDelete != null) {
+            // actually replacing the volumes in the metadata table would be 
tricky because the
+            // entries would be different rows. So it could not be
+            // atomically in one mutation and extreme care would need to be 
taken that delete
+            // entry was not lost. Instead of doing that, just deal with
+            // volume switching when something needs to be deleted. Since the 
rest of the code
+            // uses suffixes to compare delete entries, there is no danger
+            // of deleting something that should not be deleted. Must not 
change value of delete
+            // variable because that's what's stored in metadata table.
+            log.debug("Volume replaced {} -> {}", delete, switchedDelete);
+            fullPath = TabletFileUtil.validate(switchedDelete);
+          } else {
+            fullPath = new Path(TabletFileUtil.validate(delete));
+          }
+
+          for (Path pathToDel : GcVolumeUtil.expandAllVolumesUri(fs, 
fullPath)) {
+            log.debug("Deleting {}", pathToDel);
+
+            if (moveToTrash(pathToDel) || fs.deleteRecursively(pathToDel)) {
+              // delete succeeded, still want to delete
+              removeFlag = true;
+              deleted++;
+            } else if (fs.exists(pathToDel)) {
+              // leave the entry in the metadata; we'll try again later
+              removeFlag = false;
+              errors++;
+              log.warn("File exists, but was not deleted for an unknown 
reason: {}", pathToDel);
+              break;
+            } else {
+              // this failure, we still want to remove the metadata entry
+              removeFlag = true;
+              errors++;
+              String[] parts = 
pathToDel.toString().split(Constants.ZTABLES)[1].split("/");
+              if (parts.length > 2) {
+                TableId tableId = TableId.of(parts[1]);
+                String tabletDir = parts[2];
+                context.getTableManager().updateTableStateCache(tableId);
+                TableState tableState = 
context.getTableManager().getTableState(tableId);
+                if (tableState != null && tableState != TableState.DELETING) {
+                  // clone directories don't always exist
+                  if (!tabletDir.startsWith(Constants.CLONE_PREFIX)) {
+                    log.debug("File doesn't exist: {}", pathToDel);
+                  }
+                }
+              } else {
+                log.warn("Very strange path name: {}", delete);
+              }
+            }
+          }
+
+          // proceed to clearing out the flags for successful deletes and
+          // non-existent files
+          if (removeFlag) {
+            processedDeletes.add(delete);
+          }
+        } catch (Exception e) {
+          log.error("{}", e.getMessage(), e);
+        }
+
+      };
+
+      deleteThreadPool.execute(deleteTask);
+    }
+
+    deleteThreadPool.shutdown();
+
+    try {
+      while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) 
{ // empty
+      }
+    } catch (InterruptedException e1) {
+      log.error("{}", e1.getMessage(), e1);
+    }
+
+    context.getAmple().deleteGcCandidates(level, processedDeletes);
+  }
+
+  @Override
+  public void deleteTableDirIfEmpty(TableId tableID) throws IOException {
+    final VolumeManager fs = context.getVolumeManager();
+    // if dir exist and is empty, then empty list is returned...
+    // hadoop 2.0 will throw an exception if the file does not exist
+    for (String dir : context.getTablesDirs()) {
+      FileStatus[] tabletDirs;
+      try {
+        tabletDirs = fs.listStatus(new Path(dir + "/" + tableID));
+      } catch (FileNotFoundException ex) {
+        continue;
+      }
+
+      if (tabletDirs.length == 0) {
+        Path p = new Path(dir + "/" + tableID);
+        log.debug("Removing table dir {}", p);
+        if (!moveToTrash(p)) {
+          fs.delete(p);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void incrementCandidatesStat(long i) {
+    candidates += i;
+  }
+
+  @Override
+  public void incrementInUseStat(long i) {
+    inUse += i;
+  }
+
+  @Override
+  @Deprecated
+  public Iterator<Map.Entry<String,Replication.Status>> 
getReplicationNeededIterator() {
+    AccumuloClient client = context;
+    try {
+      Scanner s = 
org.apache.accumulo.core.replication.ReplicationTable.getScanner(client);
+      
org.apache.accumulo.core.replication.ReplicationSchema.StatusSection.limit(s);
+      return Iterators.transform(s.iterator(), input -> {
+        String file = input.getKey().getRow().toString();
+        Replication.Status stat;
+        try {
+          stat = Replication.Status.parseFrom(input.getValue().get());
+        } catch (InvalidProtocolBufferException e) {
+          log.warn("Could not deserialize protobuf for: {}", input.getKey());
+          stat = null;
+        }
+        return Maps.immutableEntry(file, stat);
+      });
+    } catch 
(org.apache.accumulo.core.replication.ReplicationTableOfflineException e) {
+      // No elements that we need to preclude
+      return Collections.emptyIterator();
+    }
+  }
+
+  @VisibleForTesting
+  static void minimizeDeletes(SortedMap<String,String> confirmedDeletes,
+      List<String> processedDeletes, VolumeManager fs, Logger logger) {
+    Set<Path> seenVolumes = new HashSet<>();
+
+    // when deleting a dir and all files in that dir, only need to delete the 
dir.
+    // The dir will sort right before the files... so remove the files in this 
case
+    // to minimize namenode ops
+    Iterator<Map.Entry<String,String>> cdIter = 
confirmedDeletes.entrySet().iterator();
+
+    String lastDirRel = null;
+    Path lastDirAbs = null;
+    while (cdIter.hasNext()) {
+      Map.Entry<String,String> entry = cdIter.next();
+      String relPath = entry.getKey();
+      Path absPath = new Path(entry.getValue());
+
+      if (SimpleGarbageCollector.isDir(relPath)) {
+        lastDirRel = relPath;
+        lastDirAbs = absPath;
+      } else if (lastDirRel != null) {
+        if (relPath.startsWith(lastDirRel)) {
+          Path vol = VolumeManager.FileType.TABLE.getVolume(absPath);
+
+          boolean sameVol = false;
+
+          if (GcVolumeUtil.isAllVolumesUri(lastDirAbs)) {
+            if (seenVolumes.contains(vol)) {
+              sameVol = true;
+            } else {
+              for (Volume cvol : fs.getVolumes()) {
+                if (cvol.containsPath(vol)) {
+                  seenVolumes.add(vol);
+                  sameVol = true;
+                }
+              }
+            }
+          } else {
+            sameVol = 
Objects.equals(VolumeManager.FileType.TABLE.getVolume(lastDirAbs), vol);
+          }
+
+          if (sameVol) {
+            logger.info("Ignoring {} because {} exist", entry.getValue(), 
lastDirAbs);
+            processedDeletes.add(entry.getValue());
+            cdIter.remove();
+          }
+        } else {
+          lastDirRel = null;
+          lastDirAbs = null;
+        }
+      }
+    }
+  }
+
+  /**
+   * Checks if safemode is set - files will not be deleted.
+   *
+   * @return number of delete threads
+   */
+  boolean inSafeMode() {
+    return context.getConfiguration().getBoolean(Property.GC_SAFEMODE);
+  }
+
+  /**
+   * Moves a file to trash. If this garbage collector is not using trash, this 
method returns false
+   * and leaves the file alone. If the file is missing, this method returns 
false as opposed to
+   * throwing an exception.
+   *
+   * @return true if the file was moved to trash
+   * @throws IOException
+   *           if the volume manager encountered a problem
+   */
+  boolean moveToTrash(Path path) throws IOException {
+    final VolumeManager fs = context.getVolumeManager();
+    if (!isUsingTrash()) {
+      return false;
+    }
+    try {
+      return fs.moveToTrash(path);
+    } catch (FileNotFoundException ex) {
+      return false;
+    }
+  }
+
+  /**
+   * Checks if the volume manager should move files to the trash rather than 
delete them.
+   *
+   * @return true if trash is used
+   */
+  boolean isUsingTrash() {
+    return !config.getBoolean(Property.GC_TRASH_IGNORE);
+  }
+
+  /**
+   * Gets the batch size for garbage collecting.
+   *
+   * @return candidate batch size.
+   */
+  long getCandidateBatchSize() {
+    return config.getAsBytes(Property.GC_CANDIDATE_BATCH_SIZE);
+  }
+
+  public long getInUseStat() {
+    return inUse;
+  }
+
+  public long getDeletedStat() {
+    return deleted;
+  }
+
+  public long getErrorsStat() {
+    return errors;
+  }
+
+  public long getCandidatesStat() {
+    return candidates;
+  }
+}
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index f42cfdf463..0759961d08 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -18,60 +18,33 @@
  */
 package org.apache.accumulo.gc;
 
-import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR;
-import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
-import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS;
 import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import java.util.SortedMap;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
-import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.TabletFileUtil;
-import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.metrics.MetricsUtil;
-import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.threads.ThreadPools;
-import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
@@ -80,26 +53,16 @@ import org.apache.accumulo.gc.metrics.GcMetrics;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.fs.VolumeUtil;
-import org.apache.accumulo.server.gc.GcVolumeUtil;
 import org.apache.accumulo.server.manager.LiveTServerSet;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TProcessor;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Maps;
-import com.google.protobuf.InvalidProtocolBufferException;
-
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Scope;
@@ -183,260 +146,6 @@ public class SimpleGarbageCollector extends 
AbstractServer implements Iface {
     return getConfiguration().getBoolean(Property.GC_SAFEMODE);
   }
 
-  private class GCEnv implements GarbageCollectionEnvironment {
-
-    private final DataLevel level;
-
-    GCEnv(Ample.DataLevel level) {
-      this.level = level;
-    }
-
-    @Override
-    public Iterator<String> getCandidates() {
-      return getContext().getAmple().getGcCandidates(level);
-    }
-
-    @Override
-    public List<String> readCandidatesThatFitInMemory(Iterator<String> 
candidates) {
-      long candidateLength = 0;
-      // Converting the bytes to approximate number of characters for batch 
size.
-      long candidateBatchSize = getCandidateBatchSize() / 2;
-
-      List<String> candidatesBatch = new ArrayList<>();
-
-      while (candidates.hasNext()) {
-        String candidate = candidates.next();
-        candidateLength += candidate.length();
-        candidatesBatch.add(candidate);
-        if (candidateLength > candidateBatchSize) {
-          log.info("Candidate batch of size {} has exceeded the threshold. 
Attempting to delete "
-              + "what has been gathered so far.", candidateLength);
-          return candidatesBatch;
-        }
-      }
-      return candidatesBatch;
-    }
-
-    @Override
-    public Stream<String> getBlipPaths() throws TableNotFoundException {
-
-      if (level == DataLevel.ROOT) {
-        return Stream.empty();
-      }
-
-      int blipPrefixLen = BlipSection.getRowPrefix().length();
-      var scanner =
-          new IsolatedScanner(getContext().createScanner(level.metaTable(), 
Authorizations.EMPTY));
-      scanner.setRange(BlipSection.getRange());
-      return scanner.stream()
-          .map(entry -> 
entry.getKey().getRow().toString().substring(blipPrefixLen))
-          .onClose(scanner::close);
-    }
-
-    @Override
-    public Stream<Reference> getReferences() {
-
-      Stream<TabletMetadata> tabletStream;
-
-      if (level == DataLevel.ROOT) {
-        tabletStream =
-            Stream.of(getContext().getAmple().readTablet(RootTable.EXTENT, 
DIR, FILES, SCANS));
-      } else {
-        tabletStream = 
TabletsMetadata.builder(getContext()).scanTable(level.metaTable())
-            .checkConsistency().fetch(DIR, FILES, SCANS).build().stream();
-      }
-
-      return tabletStream.flatMap(tm -> {
-        Stream<Reference> refs = Stream.concat(tm.getFiles().stream(), 
tm.getScans().stream())
-            .map(f -> new Reference(tm.getTableId(), f.getMetaUpdateDelete(), 
false));
-        if (tm.getDirName() != null) {
-          refs =
-              Stream.concat(refs, Stream.of(new Reference(tm.getTableId(), 
tm.getDirName(), true)));
-        }
-        return refs;
-      });
-    }
-
-    @Override
-    public Set<TableId> getTableIDs() {
-      return getContext().getTableIdToNameMap().keySet();
-    }
-
-    @Override
-    public void delete(SortedMap<String,String> confirmedDeletes) throws 
TableNotFoundException {
-      final VolumeManager fs = getContext().getVolumeManager();
-      var metadataLocation = level == DataLevel.ROOT
-          ? getContext().getZooKeeperRoot() + " for " + RootTable.NAME : 
level.metaTable();
-
-      if (inSafeMode()) {
-        System.out.println("SAFEMODE: There are " + confirmedDeletes.size()
-            + " data file candidates marked for deletion in " + 
metadataLocation + ".\n"
-            + "          Examine the log files to identify them.\n");
-        log.info("SAFEMODE: Listing all data file candidates for deletion");
-        for (String s : confirmedDeletes.values()) {
-          log.info("SAFEMODE: {}", s);
-        }
-        log.info("SAFEMODE: End candidates for deletion");
-        return;
-      }
-
-      List<String> processedDeletes = Collections.synchronizedList(new 
ArrayList<>());
-
-      minimizeDeletes(confirmedDeletes, processedDeletes, fs);
-
-      ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
-          .createExecutorService(getConfiguration(), 
Property.GC_DELETE_THREADS, false);
-
-      final List<Pair<Path,Path>> replacements = 
getContext().getVolumeReplacements();
-
-      for (final String delete : confirmedDeletes.values()) {
-
-        Runnable deleteTask = () -> {
-          boolean removeFlag = false;
-
-          try {
-            Path fullPath;
-            Path switchedDelete = VolumeUtil.switchVolume(delete, 
FileType.TABLE, replacements);
-            if (switchedDelete != null) {
-              // actually replacing the volumes in the metadata table would be 
tricky because the
-              // entries would be different rows. So it could not be
-              // atomically in one mutation and extreme care would need to be 
taken that delete
-              // entry was not lost. Instead of doing that, just deal with
-              // volume switching when something needs to be deleted. Since 
the rest of the code
-              // uses suffixes to compare delete entries, there is no danger
-              // of deleting something that should not be deleted. Must not 
change value of delete
-              // variable because that's what's stored in metadata table.
-              log.debug("Volume replaced {} -> {}", delete, switchedDelete);
-              fullPath = TabletFileUtil.validate(switchedDelete);
-            } else {
-              fullPath = new Path(TabletFileUtil.validate(delete));
-            }
-
-            for (Path pathToDel : GcVolumeUtil.expandAllVolumesUri(fs, 
fullPath)) {
-              log.debug("Deleting {}", pathToDel);
-
-              if (moveToTrash(pathToDel) || fs.deleteRecursively(pathToDel)) {
-                // delete succeeded, still want to delete
-                removeFlag = true;
-                synchronized (SimpleGarbageCollector.this) {
-                  ++status.current.deleted;
-                }
-              } else if (fs.exists(pathToDel)) {
-                // leave the entry in the metadata; we'll try again later
-                removeFlag = false;
-                synchronized (SimpleGarbageCollector.this) {
-                  ++status.current.errors;
-                }
-                log.warn("File exists, but was not deleted for an unknown 
reason: {}", pathToDel);
-                break;
-              } else {
-                // this failure, we still want to remove the metadata entry
-                removeFlag = true;
-                synchronized (SimpleGarbageCollector.this) {
-                  ++status.current.errors;
-                }
-                String[] parts = 
pathToDel.toString().split(Constants.ZTABLES)[1].split("/");
-                if (parts.length > 2) {
-                  TableId tableId = TableId.of(parts[1]);
-                  String tabletDir = parts[2];
-                  
getContext().getTableManager().updateTableStateCache(tableId);
-                  TableState tableState = 
getContext().getTableManager().getTableState(tableId);
-                  if (tableState != null && tableState != TableState.DELETING) 
{
-                    // clone directories don't always exist
-                    if (!tabletDir.startsWith(Constants.CLONE_PREFIX)) {
-                      log.debug("File doesn't exist: {}", pathToDel);
-                    }
-                  }
-                } else {
-                  log.warn("Very strange path name: {}", delete);
-                }
-              }
-            }
-
-            // proceed to clearing out the flags for successful deletes and
-            // non-existent files
-            if (removeFlag) {
-              processedDeletes.add(delete);
-            }
-          } catch (Exception e) {
-            log.error("{}", e.getMessage(), e);
-          }
-
-        };
-
-        deleteThreadPool.execute(deleteTask);
-      }
-
-      deleteThreadPool.shutdown();
-
-      try {
-        while (!deleteThreadPool.awaitTermination(1000, 
TimeUnit.MILLISECONDS)) { // empty
-        }
-      } catch (InterruptedException e1) {
-        log.error("{}", e1.getMessage(), e1);
-      }
-
-      getContext().getAmple().deleteGcCandidates(level, processedDeletes);
-    }
-
-    @Override
-    public void deleteTableDirIfEmpty(TableId tableID) throws IOException {
-      final VolumeManager fs = getContext().getVolumeManager();
-      // if dir exist and is empty, then empty list is returned...
-      // hadoop 2.0 will throw an exception if the file does not exist
-      for (String dir : getContext().getTablesDirs()) {
-        FileStatus[] tabletDirs;
-        try {
-          tabletDirs = fs.listStatus(new Path(dir + "/" + tableID));
-        } catch (FileNotFoundException ex) {
-          continue;
-        }
-
-        if (tabletDirs.length == 0) {
-          Path p = new Path(dir + "/" + tableID);
-          log.debug("Removing table dir {}", p);
-          if (!moveToTrash(p)) {
-            fs.delete(p);
-          }
-        }
-      }
-    }
-
-    @Override
-    public void incrementCandidatesStat(long i) {
-      status.current.candidates += i;
-    }
-
-    @Override
-    public void incrementInUseStat(long i) {
-      status.current.inUse += i;
-    }
-
-    @Override
-    @Deprecated
-    public Iterator<Entry<String,Status>> getReplicationNeededIterator() {
-      AccumuloClient client = getContext();
-      try {
-        Scanner s = 
org.apache.accumulo.core.replication.ReplicationTable.getScanner(client);
-        
org.apache.accumulo.core.replication.ReplicationSchema.StatusSection.limit(s);
-        return Iterators.transform(s.iterator(), input -> {
-          String file = input.getKey().getRow().toString();
-          Status stat;
-          try {
-            stat = Status.parseFrom(input.getValue().get());
-          } catch (InvalidProtocolBufferException e) {
-            log.warn("Could not deserialize protobuf for: {}", input.getKey());
-            stat = null;
-          }
-          return Maps.immutableEntry(file, stat);
-        });
-      } catch 
(org.apache.accumulo.core.replication.ReplicationTableOfflineException e) {
-        // No elements that we need to preclude
-        return Collections.emptyIterator();
-      }
-    }
-  }
-
   @Override
   @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call 
System.exit")
   public void run() {
@@ -493,15 +202,24 @@ public class SimpleGarbageCollector extends 
AbstractServer implements Iface {
             System.gc(); // make room
 
             status.current.started = System.currentTimeMillis();
+            var rootGC = new GCRun(DataLevel.ROOT, getContext());
+            var mdGC = new GCRun(DataLevel.METADATA, getContext());
+            var userGC = new GCRun(DataLevel.USER, getContext());
+
+            log.info("Starting Root table Garbage Collection.");
+            new GarbageCollectionAlgorithm().collect(rootGC);
+            incrementStatsForRun(rootGC);
+            logStats();
 
-            new GarbageCollectionAlgorithm().collect(new 
GCEnv(DataLevel.ROOT));
-            new GarbageCollectionAlgorithm().collect(new 
GCEnv(DataLevel.METADATA));
-            new GarbageCollectionAlgorithm().collect(new 
GCEnv(DataLevel.USER));
+            log.info("Starting Metadata table Garbage Collection.");
+            new GarbageCollectionAlgorithm().collect(mdGC);
+            incrementStatsForRun(mdGC);
+            logStats();
 
-            log.info("Number of data file candidates for deletion: {}", 
status.current.candidates);
-            log.info("Number of data file candidates still in use: {}", 
status.current.inUse);
-            log.info("Number of successfully deleted data files: {}", 
status.current.deleted);
-            log.info("Number of data files delete failures: {}", 
status.current.errors);
+            log.info("Starting User table Garbage Collection.");
+            new GarbageCollectionAlgorithm().collect(userGC);
+            incrementStatsForRun(userGC);
+            logStats();
 
             status.current.finished = System.currentTimeMillis();
             status.last = status.current;
@@ -608,6 +326,20 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
     }
   }
 
+  private void incrementStatsForRun(GCRun gcRun) {
+    status.current.candidates += gcRun.getCandidatesStat();
+    status.current.inUse += gcRun.getInUseStat();
+    status.current.deleted += gcRun.getDeletedStat();
+    status.current.errors += gcRun.getErrorsStat();
+  }
+
+  private void logStats() {
+    log.info("Number of data file candidates for deletion: {}", 
status.current.candidates);
+    log.info("Number of data file candidates still in use: {}", 
status.current.inUse);
+    log.info("Number of successfully deleted data files: {}", 
status.current.deleted);
+    log.info("Number of data files delete failures: {}", 
status.current.errors);
+  }
+
   /**
    * Moves a file to trash. If this garbage collector is not using trash, this 
method returns false
    * and leaves the file alone. If the file is missing, this method returns 
false as opposed to
@@ -702,60 +434,6 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
     return slashCount == 1;
   }
 
-  @VisibleForTesting
-  static void minimizeDeletes(SortedMap<String,String> confirmedDeletes,
-      List<String> processedDeletes, VolumeManager fs) {
-    Set<Path> seenVolumes = new HashSet<>();
-
-    // when deleting a dir and all files in that dir, only need to delete the 
dir.
-    // The dir will sort right before the files... so remove the files in this 
case
-    // to minimize namenode ops
-    Iterator<Entry<String,String>> cdIter = 
confirmedDeletes.entrySet().iterator();
-
-    String lastDirRel = null;
-    Path lastDirAbs = null;
-    while (cdIter.hasNext()) {
-      Entry<String,String> entry = cdIter.next();
-      String relPath = entry.getKey();
-      Path absPath = new Path(entry.getValue());
-
-      if (isDir(relPath)) {
-        lastDirRel = relPath;
-        lastDirAbs = absPath;
-      } else if (lastDirRel != null) {
-        if (relPath.startsWith(lastDirRel)) {
-          Path vol = FileType.TABLE.getVolume(absPath);
-
-          boolean sameVol = false;
-
-          if (GcVolumeUtil.isAllVolumesUri(lastDirAbs)) {
-            if (seenVolumes.contains(vol)) {
-              sameVol = true;
-            } else {
-              for (Volume cvol : fs.getVolumes()) {
-                if (cvol.containsPath(vol)) {
-                  seenVolumes.add(vol);
-                  sameVol = true;
-                }
-              }
-            }
-          } else {
-            sameVol = Objects.equals(FileType.TABLE.getVolume(lastDirAbs), 
vol);
-          }
-
-          if (sameVol) {
-            log.info("Ignoring {} because {} exist", entry.getValue(), 
lastDirAbs);
-            processedDeletes.add(entry.getValue());
-            cdIter.remove();
-          }
-        } else {
-          lastDirRel = null;
-          lastDirAbs = null;
-        }
-      }
-    }
-  }
-
   @Override
   public GCStatus getStatus(TInfo info, TCredentials credentials) {
     return status;
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
index 337a816ee0..4775825611 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
@@ -53,8 +53,12 @@ import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SimpleGarbageCollectorTest {
+  private static final Logger log = 
LoggerFactory.getLogger(SimpleGarbageCollectorTest.class);
+
   private VolumeManager volMgr;
   private ServerContext context;
   private Credentials credentials;
@@ -178,7 +182,7 @@ public class SimpleGarbageCollectorTest {
 
     List<String> processedDeletes = new ArrayList<>();
 
-    SimpleGarbageCollector.minimizeDeletes(confirmed, processedDeletes, 
volMgr2);
+    GCRun.minimizeDeletes(confirmed, processedDeletes, volMgr2, log);
 
     TreeMap<String,String> expected = new TreeMap<>();
     expected.put("5a/t-0001", "hdfs://nn1/accumulo/tables/5a/t-0001");

Reply via email to