This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new d1b06d3 Moves gc candidate code insertion to Ample (#1762) d1b06d3 is described below commit d1b06d317983d460039861d4ab462d5a65ca3c11 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Nov 3 12:38:55 2020 -0500 Moves gc candidate code insertion to Ample (#1762) --- .../accumulo/core/metadata/schema/Ample.java | 7 +++++++ .../accumulo/server/metadata/RootGcCandidates.java | 5 +++-- .../accumulo/server/metadata/ServerAmpleImpl.java | 22 +++++++++++++++++++++- .../accumulo/server/util/MetadataTableUtil.java | 18 ------------------ .../apache/accumulo/master/TabletGroupWatcher.java | 6 +++--- .../tableOps/bulkVer1/CleanUpBulkImport.java | 5 ++++- .../tableOps/bulkVer2/CleanUpBulkImport.java | 4 +++- 7 files changed, 41 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 4703b32..70044ae 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -144,6 +144,13 @@ public interface Ample { throw new UnsupportedOperationException(); } + /** + * Unlike {@link #putGcCandidates(TableId, Collection)} this takes file and dir GC candidates. + */ + default void putGcFileAndDirCandidates(TableId tableId, Collection<String> candidates) { + throw new UnsupportedOperationException(); + } + default void deleteGcCandidates(DataLevel level, Collection<String> paths) { throw new UnsupportedOperationException(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java index 53dd6c5..7823750 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java @@ -21,6 +21,7 @@ package org.apache.accumulo.server.metadata; import static java.nio.charset.StandardCharsets.UTF_8; import java.util.Collection; +import java.util.Iterator; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; @@ -63,8 +64,8 @@ public class RootGcCandidates { this.candidates = candidates; } - public void add(Collection<StoredTabletFile> refs) { - refs.forEach(ref -> { + public void add(Iterator<StoredTabletFile> refs) { + refs.forEachRemaining(ref -> { String parent = ref.getPath().getParent().toString(); candidates.computeIfAbsent(parent, k -> new TreeSet<>()).add(ref.getFileName()); }); diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index a11f317..e735427 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -105,7 +105,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { public void putGcCandidates(TableId tableId, Collection<StoredTabletFile> candidates) { if (RootTable.ID.equals(tableId)) { - mutateRootGcCandidates(rgcc -> rgcc.add(candidates)); + mutateRootGcCandidates(rgcc -> rgcc.add(candidates.iterator())); return; } @@ -119,6 +119,26 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { } @Override + public void putGcFileAndDirCandidates(TableId tableId, Collection<String> candidates) { + + if (RootTable.ID.equals(tableId)) { + + // Directories are unexpected for the root tablet, so convert to stored tablet file + mutateRootGcCandidates( + rgcc -> rgcc.add(candidates.stream().map(StoredTabletFile::new).iterator())); + return; + } + + try (BatchWriter writer = createWriter(tableId)) { + for (String fileOrDir : candidates) { + writer.addMutation(createDeleteMutation(fileOrDir)); + } + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + } + + @Override public void deleteGcCandidates(DataLevel level, Collection<String> paths) { if (level == DataLevel.ROOT) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index a8c6011..8cb64a5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -279,24 +279,6 @@ public class MetadataTableUtil { finishSplit(extent.toMetaRow(), datafileSizes, highDatafilesToRemove, context, zooLock); } - /** - * datafilesToDelete are strings because they can be a TabletFile or directory - */ - public static void addDeleteEntries(KeyExtent extent, Set<String> datafilesToDelete, - ServerContext context, Ample ample) { - - // TODO could use batch writer,would need to handle failure and retry like update does - - // ACCUMULO-1294 - for (String pathToRemove : datafilesToDelete) { - update(context, ample.createDeleteMutation(pathToRemove), extent); - } - } - - public static void addDeleteEntry(ServerContext context, TableId tableId, String path) { - update(context, context.getAmple().createDeleteMutation(path), - new KeyExtent(tableId, null, null)); - } - public static void removeScanFiles(KeyExtent extent, Set<StoredTabletFile> scanFiles, ServerContext context, ZooLock zooLock) { TabletMutator tablet = context.getAmple().mutateTablet(extent); diff --git a/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index d12eb67..e73ae66 100644 --- a/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -642,7 +642,7 @@ abstract class TabletGroupWatcher extends Daemon { if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) { datafiles.add(TabletFileUtil.validate(key.getColumnQualifierData().toString())); if (datafiles.size() > 1000) { - MetadataTableUtil.addDeleteEntries(extent, datafiles, context, ample); + ample.putGcFileAndDirCandidates(extent.tableId(), datafiles); datafiles.clear(); } } else if (ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { @@ -655,12 +655,12 @@ abstract class TabletGroupWatcher extends Daemon { entry.getValue().toString()); datafiles.add(path); if (datafiles.size() > 1000) { - MetadataTableUtil.addDeleteEntries(extent, datafiles, context, ample); + ample.putGcFileAndDirCandidates(extent.tableId(), datafiles); datafiles.clear(); } } } - MetadataTableUtil.addDeleteEntries(extent, datafiles, context, ample); + ample.putGcFileAndDirCandidates(extent.tableId(), datafiles); BatchWriter bw = client.createBatchWriter(targetSystemTable, new BatchWriterConfig()); try { deleteTablets(info, deleteRange, bw, client); diff --git a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java index b778410..1bd6342 100644 --- a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.master.tableOps.bulkVer1; +import java.util.Collections; + import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.data.TableId; @@ -58,7 +60,8 @@ public class CleanUpBulkImport extends MasterRepo { Path bulkDir = new Path(bulk); MetadataTableUtil.removeBulkLoadInProgressFlag(master.getContext(), "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()); - MetadataTableUtil.addDeleteEntry(master.getContext(), tableId, bulkDir.toString()); + master.getContext().getAmple().putGcFileAndDirCandidates(tableId, + Collections.singleton(bulkDir.toString())); log.debug("removing the metadata table markers for loaded files"); AccumuloClient client = master.getContext(); MetadataTableUtil.removeBulkLoadEntries(client, tableId, tid); diff --git a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java index d0d219b..a15869c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java @@ -19,6 +19,7 @@ package org.apache.accumulo.master.tableOps.bulkVer2; import java.io.IOException; +import java.util.Collections; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloClient; @@ -52,7 +53,8 @@ public class CleanUpBulkImport extends MasterRepo { Path bulkDir = new Path(info.bulkDir); MetadataTableUtil.removeBulkLoadInProgressFlag(master.getContext(), "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()); - MetadataTableUtil.addDeleteEntry(master.getContext(), info.tableId, bulkDir.toString()); + master.getContext().getAmple().putGcFileAndDirCandidates(info.tableId, + Collections.singleton(bulkDir.toString())); if (info.tableState == TableState.ONLINE) { log.debug("removing the metadata table markers for loaded files"); AccumuloClient client = master.getContext();