This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 16b5e3bb7b Create new internal GC class (#2670) 16b5e3bb7b is described below commit 16b5e3bb7b3598896004f5648a9418f3e238d570 Author: Mike Miller <mmil...@apache.org> AuthorDate: Tue May 3 08:40:33 2022 -0400 Create new internal GC class (#2670) * Pull out the GCEnv inner class of SimpleGarbageCollector into its own class and call it GCRun. * Keep the logic of GCEnv but make the stats private numbers to allow dropping of synchronized block * Create separate logger classes for each instance of GCRun * Make GCRun return the stats gathered during that run * Make SimpleGarbageCollector increment the current stats based on what is returned in the GCRun class --- .../main/java/org/apache/accumulo/gc/GCRun.java | 449 +++++++++++++++++++++ .../apache/accumulo/gc/SimpleGarbageCollector.java | 382 ++---------------- .../accumulo/gc/SimpleGarbageCollectorTest.java | 6 +- 3 files changed, 484 insertions(+), 353 deletions(-) diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java new file mode 100644 index 0000000000..a4a80d99d4 --- /dev/null +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.gc; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.TabletFileUtil; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.volume.Volume; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeUtil; +import org.apache.accumulo.server.gc.GcVolumeUtil; +import org.apache.accumulo.server.replication.proto.Replication; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * A single garbage collection performed on a table (Root, MD) or all User tables. + */ +public class GCRun implements GarbageCollectionEnvironment { + private final Logger log; + private final Ample.DataLevel level; + private final ServerContext context; + private final AccumuloConfiguration config; + private long candidates = 0; + private long inUse = 0; + private long deleted = 0; + private long errors = 0; + + GCRun(Ample.DataLevel level, ServerContext context) { + this.log = LoggerFactory.getLogger(level.name() + GCRun.class); + this.level = level; + this.context = context; + this.config = context.getConfiguration(); + } + + @Override + public Iterator<String> getCandidates() { + return context.getAmple().getGcCandidates(level); + } + + @Override + public List<String> readCandidatesThatFitInMemory(Iterator<String> candidates) { + long candidateLength = 0; + // Converting the bytes to approximate number of characters for batch size. + long candidateBatchSize = getCandidateBatchSize() / 2; + + List<String> candidatesBatch = new ArrayList<>(); + + while (candidates.hasNext()) { + String candidate = candidates.next(); + candidateLength += candidate.length(); + candidatesBatch.add(candidate); + if (candidateLength > candidateBatchSize) { + log.info("Candidate batch of size {} has exceeded the threshold. Attempting to delete " + + "what has been gathered so far.", candidateLength); + return candidatesBatch; + } + } + return candidatesBatch; + } + + @Override + public Stream<String> getBlipPaths() throws TableNotFoundException { + + if (level == Ample.DataLevel.ROOT) { + return Stream.empty(); + } + + int blipPrefixLen = MetadataSchema.BlipSection.getRowPrefix().length(); + var scanner = + new IsolatedScanner(context.createScanner(level.metaTable(), Authorizations.EMPTY)); + scanner.setRange(MetadataSchema.BlipSection.getRange()); + return scanner.stream() + .map(entry -> entry.getKey().getRow().toString().substring(blipPrefixLen)) + .onClose(scanner::close); + } + + @Override + public Stream<Reference> getReferences() { + Stream<TabletMetadata> tabletStream; + + if (level == Ample.DataLevel.ROOT) { + tabletStream = Stream.of(context.getAmple().readTablet(RootTable.EXTENT, DIR, FILES, SCANS)); + } else { + tabletStream = TabletsMetadata.builder(context).scanTable(level.metaTable()) + .checkConsistency().fetch(DIR, FILES, SCANS).build().stream(); + } + + return tabletStream.flatMap(tm -> { + Stream<Reference> refs = Stream.concat(tm.getFiles().stream(), tm.getScans().stream()) + .map(f -> new Reference(tm.getTableId(), f.getMetaUpdateDelete(), false)); + if (tm.getDirName() != null) { + refs = + Stream.concat(refs, Stream.of(new Reference(tm.getTableId(), tm.getDirName(), true))); + } + return refs; + }); + } + + @Override + public Set<TableId> getTableIDs() { + return context.getTableIdToNameMap().keySet(); + } + + @Override + public void delete(SortedMap<String,String> confirmedDeletes) throws TableNotFoundException { + final VolumeManager fs = context.getVolumeManager(); + var metadataLocation = level == Ample.DataLevel.ROOT + ? context.getZooKeeperRoot() + " for " + RootTable.NAME : level.metaTable(); + + if (inSafeMode()) { + System.out.println("SAFEMODE: There are " + confirmedDeletes.size() + + " data file candidates marked for deletion in " + metadataLocation + ".\n" + + " Examine the log files to identify them.\n"); + log.info("SAFEMODE: Listing all data file candidates for deletion"); + for (String s : confirmedDeletes.values()) { + log.info("SAFEMODE: {}", s); + } + log.info("SAFEMODE: End candidates for deletion"); + return; + } + + List<String> processedDeletes = Collections.synchronizedList(new ArrayList<>()); + + minimizeDeletes(confirmedDeletes, processedDeletes, fs, log); + + ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() + .createExecutorService(config, Property.GC_DELETE_THREADS, false); + + final List<Pair<Path,Path>> replacements = context.getVolumeReplacements(); + + for (final String delete : confirmedDeletes.values()) { + + Runnable deleteTask = () -> { + boolean removeFlag = false; + + try { + Path fullPath; + Path switchedDelete = + VolumeUtil.switchVolume(delete, VolumeManager.FileType.TABLE, replacements); + if (switchedDelete != null) { + // actually replacing the volumes in the metadata table would be tricky because the + // entries would be different rows. So it could not be + // atomically in one mutation and extreme care would need to be taken that delete + // entry was not lost. Instead of doing that, just deal with + // volume switching when something needs to be deleted. Since the rest of the code + // uses suffixes to compare delete entries, there is no danger + // of deleting something that should not be deleted. Must not change value of delete + // variable because that's what's stored in metadata table. + log.debug("Volume replaced {} -> {}", delete, switchedDelete); + fullPath = TabletFileUtil.validate(switchedDelete); + } else { + fullPath = new Path(TabletFileUtil.validate(delete)); + } + + for (Path pathToDel : GcVolumeUtil.expandAllVolumesUri(fs, fullPath)) { + log.debug("Deleting {}", pathToDel); + + if (moveToTrash(pathToDel) || fs.deleteRecursively(pathToDel)) { + // delete succeeded, still want to delete + removeFlag = true; + deleted++; + } else if (fs.exists(pathToDel)) { + // leave the entry in the metadata; we'll try again later + removeFlag = false; + errors++; + log.warn("File exists, but was not deleted for an unknown reason: {}", pathToDel); + break; + } else { + // this failure, we still want to remove the metadata entry + removeFlag = true; + errors++; + String[] parts = pathToDel.toString().split(Constants.ZTABLES)[1].split("/"); + if (parts.length > 2) { + TableId tableId = TableId.of(parts[1]); + String tabletDir = parts[2]; + context.getTableManager().updateTableStateCache(tableId); + TableState tableState = context.getTableManager().getTableState(tableId); + if (tableState != null && tableState != TableState.DELETING) { + // clone directories don't always exist + if (!tabletDir.startsWith(Constants.CLONE_PREFIX)) { + log.debug("File doesn't exist: {}", pathToDel); + } + } + } else { + log.warn("Very strange path name: {}", delete); + } + } + } + + // proceed to clearing out the flags for successful deletes and + // non-existent files + if (removeFlag) { + processedDeletes.add(delete); + } + } catch (Exception e) { + log.error("{}", e.getMessage(), e); + } + + }; + + deleteThreadPool.execute(deleteTask); + } + + deleteThreadPool.shutdown(); + + try { + while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) { // empty + } + } catch (InterruptedException e1) { + log.error("{}", e1.getMessage(), e1); + } + + context.getAmple().deleteGcCandidates(level, processedDeletes); + } + + @Override + public void deleteTableDirIfEmpty(TableId tableID) throws IOException { + final VolumeManager fs = context.getVolumeManager(); + // if dir exist and is empty, then empty list is returned... + // hadoop 2.0 will throw an exception if the file does not exist + for (String dir : context.getTablesDirs()) { + FileStatus[] tabletDirs; + try { + tabletDirs = fs.listStatus(new Path(dir + "/" + tableID)); + } catch (FileNotFoundException ex) { + continue; + } + + if (tabletDirs.length == 0) { + Path p = new Path(dir + "/" + tableID); + log.debug("Removing table dir {}", p); + if (!moveToTrash(p)) { + fs.delete(p); + } + } + } + } + + @Override + public void incrementCandidatesStat(long i) { + candidates += i; + } + + @Override + public void incrementInUseStat(long i) { + inUse += i; + } + + @Override + @Deprecated + public Iterator<Map.Entry<String,Replication.Status>> getReplicationNeededIterator() { + AccumuloClient client = context; + try { + Scanner s = org.apache.accumulo.core.replication.ReplicationTable.getScanner(client); + org.apache.accumulo.core.replication.ReplicationSchema.StatusSection.limit(s); + return Iterators.transform(s.iterator(), input -> { + String file = input.getKey().getRow().toString(); + Replication.Status stat; + try { + stat = Replication.Status.parseFrom(input.getValue().get()); + } catch (InvalidProtocolBufferException e) { + log.warn("Could not deserialize protobuf for: {}", input.getKey()); + stat = null; + } + return Maps.immutableEntry(file, stat); + }); + } catch (org.apache.accumulo.core.replication.ReplicationTableOfflineException e) { + // No elements that we need to preclude + return Collections.emptyIterator(); + } + } + + @VisibleForTesting + static void minimizeDeletes(SortedMap<String,String> confirmedDeletes, + List<String> processedDeletes, VolumeManager fs, Logger logger) { + Set<Path> seenVolumes = new HashSet<>(); + + // when deleting a dir and all files in that dir, only need to delete the dir. + // The dir will sort right before the files... so remove the files in this case + // to minimize namenode ops + Iterator<Map.Entry<String,String>> cdIter = confirmedDeletes.entrySet().iterator(); + + String lastDirRel = null; + Path lastDirAbs = null; + while (cdIter.hasNext()) { + Map.Entry<String,String> entry = cdIter.next(); + String relPath = entry.getKey(); + Path absPath = new Path(entry.getValue()); + + if (SimpleGarbageCollector.isDir(relPath)) { + lastDirRel = relPath; + lastDirAbs = absPath; + } else if (lastDirRel != null) { + if (relPath.startsWith(lastDirRel)) { + Path vol = VolumeManager.FileType.TABLE.getVolume(absPath); + + boolean sameVol = false; + + if (GcVolumeUtil.isAllVolumesUri(lastDirAbs)) { + if (seenVolumes.contains(vol)) { + sameVol = true; + } else { + for (Volume cvol : fs.getVolumes()) { + if (cvol.containsPath(vol)) { + seenVolumes.add(vol); + sameVol = true; + } + } + } + } else { + sameVol = Objects.equals(VolumeManager.FileType.TABLE.getVolume(lastDirAbs), vol); + } + + if (sameVol) { + logger.info("Ignoring {} because {} exist", entry.getValue(), lastDirAbs); + processedDeletes.add(entry.getValue()); + cdIter.remove(); + } + } else { + lastDirRel = null; + lastDirAbs = null; + } + } + } + } + + /** + * Checks if safemode is set - files will not be deleted. + * + * @return number of delete threads + */ + boolean inSafeMode() { + return context.getConfiguration().getBoolean(Property.GC_SAFEMODE); + } + + /** + * Moves a file to trash. If this garbage collector is not using trash, this method returns false + * and leaves the file alone. If the file is missing, this method returns false as opposed to + * throwing an exception. + * + * @return true if the file was moved to trash + * @throws IOException + * if the volume manager encountered a problem + */ + boolean moveToTrash(Path path) throws IOException { + final VolumeManager fs = context.getVolumeManager(); + if (!isUsingTrash()) { + return false; + } + try { + return fs.moveToTrash(path); + } catch (FileNotFoundException ex) { + return false; + } + } + + /** + * Checks if the volume manager should move files to the trash rather than delete them. + * + * @return true if trash is used + */ + boolean isUsingTrash() { + return !config.getBoolean(Property.GC_TRASH_IGNORE); + } + + /** + * Gets the batch size for garbage collecting. + * + * @return candidate batch size. + */ + long getCandidateBatchSize() { + return config.getAsBytes(Property.GC_CANDIDATE_BATCH_SIZE); + } + + public long getInUseStat() { + return inUse; + } + + public long getDeletedStat() { + return deleted; + } + + public long getErrorsStat() { + return errors; + } + + public long getCandidatesStat() { + return candidates; + } +} diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index f42cfdf463..0759961d08 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -18,60 +18,33 @@ */ package org.apache.accumulo.gc; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; -import java.util.SortedMap; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; -import java.util.stream.Stream; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.IsolatedScanner; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.TabletFileUtil; -import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsUtil; -import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.threads.ThreadPools; -import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.fate.zookeeper.ServiceLock; import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher; @@ -80,26 +53,16 @@ import org.apache.accumulo.gc.metrics.GcMetrics; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.fs.VolumeManager.FileType; -import org.apache.accumulo.server.fs.VolumeUtil; -import org.apache.accumulo.server.gc.GcVolumeUtil; import org.apache.accumulo.server.manager.LiveTServerSet; -import org.apache.accumulo.server.replication.proto.Replication.Status; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.thrift.TProcessor; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterators; -import com.google.common.collect.Maps; -import com.google.protobuf.InvalidProtocolBufferException; - import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -183,260 +146,6 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { return getConfiguration().getBoolean(Property.GC_SAFEMODE); } - private class GCEnv implements GarbageCollectionEnvironment { - - private final DataLevel level; - - GCEnv(Ample.DataLevel level) { - this.level = level; - } - - @Override - public Iterator<String> getCandidates() { - return getContext().getAmple().getGcCandidates(level); - } - - @Override - public List<String> readCandidatesThatFitInMemory(Iterator<String> candidates) { - long candidateLength = 0; - // Converting the bytes to approximate number of characters for batch size. - long candidateBatchSize = getCandidateBatchSize() / 2; - - List<String> candidatesBatch = new ArrayList<>(); - - while (candidates.hasNext()) { - String candidate = candidates.next(); - candidateLength += candidate.length(); - candidatesBatch.add(candidate); - if (candidateLength > candidateBatchSize) { - log.info("Candidate batch of size {} has exceeded the threshold. Attempting to delete " - + "what has been gathered so far.", candidateLength); - return candidatesBatch; - } - } - return candidatesBatch; - } - - @Override - public Stream<String> getBlipPaths() throws TableNotFoundException { - - if (level == DataLevel.ROOT) { - return Stream.empty(); - } - - int blipPrefixLen = BlipSection.getRowPrefix().length(); - var scanner = - new IsolatedScanner(getContext().createScanner(level.metaTable(), Authorizations.EMPTY)); - scanner.setRange(BlipSection.getRange()); - return scanner.stream() - .map(entry -> entry.getKey().getRow().toString().substring(blipPrefixLen)) - .onClose(scanner::close); - } - - @Override - public Stream<Reference> getReferences() { - - Stream<TabletMetadata> tabletStream; - - if (level == DataLevel.ROOT) { - tabletStream = - Stream.of(getContext().getAmple().readTablet(RootTable.EXTENT, DIR, FILES, SCANS)); - } else { - tabletStream = TabletsMetadata.builder(getContext()).scanTable(level.metaTable()) - .checkConsistency().fetch(DIR, FILES, SCANS).build().stream(); - } - - return tabletStream.flatMap(tm -> { - Stream<Reference> refs = Stream.concat(tm.getFiles().stream(), tm.getScans().stream()) - .map(f -> new Reference(tm.getTableId(), f.getMetaUpdateDelete(), false)); - if (tm.getDirName() != null) { - refs = - Stream.concat(refs, Stream.of(new Reference(tm.getTableId(), tm.getDirName(), true))); - } - return refs; - }); - } - - @Override - public Set<TableId> getTableIDs() { - return getContext().getTableIdToNameMap().keySet(); - } - - @Override - public void delete(SortedMap<String,String> confirmedDeletes) throws TableNotFoundException { - final VolumeManager fs = getContext().getVolumeManager(); - var metadataLocation = level == DataLevel.ROOT - ? getContext().getZooKeeperRoot() + " for " + RootTable.NAME : level.metaTable(); - - if (inSafeMode()) { - System.out.println("SAFEMODE: There are " + confirmedDeletes.size() - + " data file candidates marked for deletion in " + metadataLocation + ".\n" - + " Examine the log files to identify them.\n"); - log.info("SAFEMODE: Listing all data file candidates for deletion"); - for (String s : confirmedDeletes.values()) { - log.info("SAFEMODE: {}", s); - } - log.info("SAFEMODE: End candidates for deletion"); - return; - } - - List<String> processedDeletes = Collections.synchronizedList(new ArrayList<>()); - - minimizeDeletes(confirmedDeletes, processedDeletes, fs); - - ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() - .createExecutorService(getConfiguration(), Property.GC_DELETE_THREADS, false); - - final List<Pair<Path,Path>> replacements = getContext().getVolumeReplacements(); - - for (final String delete : confirmedDeletes.values()) { - - Runnable deleteTask = () -> { - boolean removeFlag = false; - - try { - Path fullPath; - Path switchedDelete = VolumeUtil.switchVolume(delete, FileType.TABLE, replacements); - if (switchedDelete != null) { - // actually replacing the volumes in the metadata table would be tricky because the - // entries would be different rows. So it could not be - // atomically in one mutation and extreme care would need to be taken that delete - // entry was not lost. Instead of doing that, just deal with - // volume switching when something needs to be deleted. Since the rest of the code - // uses suffixes to compare delete entries, there is no danger - // of deleting something that should not be deleted. Must not change value of delete - // variable because that's what's stored in metadata table. - log.debug("Volume replaced {} -> {}", delete, switchedDelete); - fullPath = TabletFileUtil.validate(switchedDelete); - } else { - fullPath = new Path(TabletFileUtil.validate(delete)); - } - - for (Path pathToDel : GcVolumeUtil.expandAllVolumesUri(fs, fullPath)) { - log.debug("Deleting {}", pathToDel); - - if (moveToTrash(pathToDel) || fs.deleteRecursively(pathToDel)) { - // delete succeeded, still want to delete - removeFlag = true; - synchronized (SimpleGarbageCollector.this) { - ++status.current.deleted; - } - } else if (fs.exists(pathToDel)) { - // leave the entry in the metadata; we'll try again later - removeFlag = false; - synchronized (SimpleGarbageCollector.this) { - ++status.current.errors; - } - log.warn("File exists, but was not deleted for an unknown reason: {}", pathToDel); - break; - } else { - // this failure, we still want to remove the metadata entry - removeFlag = true; - synchronized (SimpleGarbageCollector.this) { - ++status.current.errors; - } - String[] parts = pathToDel.toString().split(Constants.ZTABLES)[1].split("/"); - if (parts.length > 2) { - TableId tableId = TableId.of(parts[1]); - String tabletDir = parts[2]; - getContext().getTableManager().updateTableStateCache(tableId); - TableState tableState = getContext().getTableManager().getTableState(tableId); - if (tableState != null && tableState != TableState.DELETING) { - // clone directories don't always exist - if (!tabletDir.startsWith(Constants.CLONE_PREFIX)) { - log.debug("File doesn't exist: {}", pathToDel); - } - } - } else { - log.warn("Very strange path name: {}", delete); - } - } - } - - // proceed to clearing out the flags for successful deletes and - // non-existent files - if (removeFlag) { - processedDeletes.add(delete); - } - } catch (Exception e) { - log.error("{}", e.getMessage(), e); - } - - }; - - deleteThreadPool.execute(deleteTask); - } - - deleteThreadPool.shutdown(); - - try { - while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) { // empty - } - } catch (InterruptedException e1) { - log.error("{}", e1.getMessage(), e1); - } - - getContext().getAmple().deleteGcCandidates(level, processedDeletes); - } - - @Override - public void deleteTableDirIfEmpty(TableId tableID) throws IOException { - final VolumeManager fs = getContext().getVolumeManager(); - // if dir exist and is empty, then empty list is returned... - // hadoop 2.0 will throw an exception if the file does not exist - for (String dir : getContext().getTablesDirs()) { - FileStatus[] tabletDirs; - try { - tabletDirs = fs.listStatus(new Path(dir + "/" + tableID)); - } catch (FileNotFoundException ex) { - continue; - } - - if (tabletDirs.length == 0) { - Path p = new Path(dir + "/" + tableID); - log.debug("Removing table dir {}", p); - if (!moveToTrash(p)) { - fs.delete(p); - } - } - } - } - - @Override - public void incrementCandidatesStat(long i) { - status.current.candidates += i; - } - - @Override - public void incrementInUseStat(long i) { - status.current.inUse += i; - } - - @Override - @Deprecated - public Iterator<Entry<String,Status>> getReplicationNeededIterator() { - AccumuloClient client = getContext(); - try { - Scanner s = org.apache.accumulo.core.replication.ReplicationTable.getScanner(client); - org.apache.accumulo.core.replication.ReplicationSchema.StatusSection.limit(s); - return Iterators.transform(s.iterator(), input -> { - String file = input.getKey().getRow().toString(); - Status stat; - try { - stat = Status.parseFrom(input.getValue().get()); - } catch (InvalidProtocolBufferException e) { - log.warn("Could not deserialize protobuf for: {}", input.getKey()); - stat = null; - } - return Maps.immutableEntry(file, stat); - }); - } catch (org.apache.accumulo.core.replication.ReplicationTableOfflineException e) { - // No elements that we need to preclude - return Collections.emptyIterator(); - } - } - } - @Override @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call System.exit") public void run() { @@ -493,15 +202,24 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { System.gc(); // make room status.current.started = System.currentTimeMillis(); + var rootGC = new GCRun(DataLevel.ROOT, getContext()); + var mdGC = new GCRun(DataLevel.METADATA, getContext()); + var userGC = new GCRun(DataLevel.USER, getContext()); + + log.info("Starting Root table Garbage Collection."); + new GarbageCollectionAlgorithm().collect(rootGC); + incrementStatsForRun(rootGC); + logStats(); - new GarbageCollectionAlgorithm().collect(new GCEnv(DataLevel.ROOT)); - new GarbageCollectionAlgorithm().collect(new GCEnv(DataLevel.METADATA)); - new GarbageCollectionAlgorithm().collect(new GCEnv(DataLevel.USER)); + log.info("Starting Metadata table Garbage Collection."); + new GarbageCollectionAlgorithm().collect(mdGC); + incrementStatsForRun(mdGC); + logStats(); - log.info("Number of data file candidates for deletion: {}", status.current.candidates); - log.info("Number of data file candidates still in use: {}", status.current.inUse); - log.info("Number of successfully deleted data files: {}", status.current.deleted); - log.info("Number of data files delete failures: {}", status.current.errors); + log.info("Starting User table Garbage Collection."); + new GarbageCollectionAlgorithm().collect(userGC); + incrementStatsForRun(userGC); + logStats(); status.current.finished = System.currentTimeMillis(); status.last = status.current; @@ -608,6 +326,20 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { } } + private void incrementStatsForRun(GCRun gcRun) { + status.current.candidates += gcRun.getCandidatesStat(); + status.current.inUse += gcRun.getInUseStat(); + status.current.deleted += gcRun.getDeletedStat(); + status.current.errors += gcRun.getErrorsStat(); + } + + private void logStats() { + log.info("Number of data file candidates for deletion: {}", status.current.candidates); + log.info("Number of data file candidates still in use: {}", status.current.inUse); + log.info("Number of successfully deleted data files: {}", status.current.deleted); + log.info("Number of data files delete failures: {}", status.current.errors); + } + /** * Moves a file to trash. If this garbage collector is not using trash, this method returns false * and leaves the file alone. If the file is missing, this method returns false as opposed to @@ -702,60 +434,6 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { return slashCount == 1; } - @VisibleForTesting - static void minimizeDeletes(SortedMap<String,String> confirmedDeletes, - List<String> processedDeletes, VolumeManager fs) { - Set<Path> seenVolumes = new HashSet<>(); - - // when deleting a dir and all files in that dir, only need to delete the dir. - // The dir will sort right before the files... so remove the files in this case - // to minimize namenode ops - Iterator<Entry<String,String>> cdIter = confirmedDeletes.entrySet().iterator(); - - String lastDirRel = null; - Path lastDirAbs = null; - while (cdIter.hasNext()) { - Entry<String,String> entry = cdIter.next(); - String relPath = entry.getKey(); - Path absPath = new Path(entry.getValue()); - - if (isDir(relPath)) { - lastDirRel = relPath; - lastDirAbs = absPath; - } else if (lastDirRel != null) { - if (relPath.startsWith(lastDirRel)) { - Path vol = FileType.TABLE.getVolume(absPath); - - boolean sameVol = false; - - if (GcVolumeUtil.isAllVolumesUri(lastDirAbs)) { - if (seenVolumes.contains(vol)) { - sameVol = true; - } else { - for (Volume cvol : fs.getVolumes()) { - if (cvol.containsPath(vol)) { - seenVolumes.add(vol); - sameVol = true; - } - } - } - } else { - sameVol = Objects.equals(FileType.TABLE.getVolume(lastDirAbs), vol); - } - - if (sameVol) { - log.info("Ignoring {} because {} exist", entry.getValue(), lastDirAbs); - processedDeletes.add(entry.getValue()); - cdIter.remove(); - } - } else { - lastDirRel = null; - lastDirAbs = null; - } - } - } - } - @Override public GCStatus getStatus(TInfo info, TCredentials credentials) { return status; diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java index 337a816ee0..4775825611 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java @@ -53,8 +53,12 @@ import org.apache.accumulo.server.security.SystemCredentials; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SimpleGarbageCollectorTest { + private static final Logger log = LoggerFactory.getLogger(SimpleGarbageCollectorTest.class); + private VolumeManager volMgr; private ServerContext context; private Credentials credentials; @@ -178,7 +182,7 @@ public class SimpleGarbageCollectorTest { List<String> processedDeletes = new ArrayList<>(); - SimpleGarbageCollector.minimizeDeletes(confirmed, processedDeletes, volMgr2); + GCRun.minimizeDeletes(confirmed, processedDeletes, volMgr2, log); TreeMap<String,String> expected = new TreeMap<>(); expected.put("5a/t-0001", "hdfs://nn1/accumulo/tables/5a/t-0001");