This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new e776715d87 fixes #473 avoids scanning entire table metadata for bulk import (#3336) e776715d87 is described below commit e776715d8763309cffa314d14d3c2c41584b6402 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue May 9 15:54:01 2023 -0400 fixes #473 avoids scanning entire table metadata for bulk import (#3336) --- .../accumulo/core/metadata/schema/Ample.java | 4 ++- .../accumulo/server/metadata/ServerAmpleImpl.java | 4 +-- .../tableOps/bulkVer1/CleanUpBulkImport.java | 2 +- .../manager/tableOps/bulkVer2/BulkInfo.java | 4 +++ .../tableOps/bulkVer2/CleanUpBulkImport.java | 16 +++++++++--- .../manager/tableOps/bulkVer2/PrepBulkImport.java | 30 +++++++++++++++++++--- .../tableOps/bulkVer2/PrepBulkImportTest.java | 12 ++++++++- 7 files changed, 59 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 7db46c77a0..8911c137f6 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -364,8 +364,10 @@ public interface Ample { * * @param tableId Table ID for transaction removals * @param tid Transaction ID to remove + * @param firstSplit non-inclusive table split point at which to start looking for load markers + * @param lastSplit inclusive tablet split point at which to stop looking for load markers */ - default void removeBulkLoadEntries(TableId tableId, long tid) { + default void removeBulkLoadEntries(TableId tableId, long tid, Text firstSplit, Text lastSplit) { throw new UnsupportedOperationException(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index d23917f0a3..03cdb4666d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -184,13 +184,13 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { } @Override - public void removeBulkLoadEntries(TableId tableId, long tid) { + public void removeBulkLoadEntries(TableId tableId, long tid, Text firstSplit, Text lastSplit) { Preconditions.checkArgument(DataLevel.of(tableId) == DataLevel.USER); try ( Scanner mscanner = new IsolatedScanner(context.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); BatchWriter bw = context.createBatchWriter(MetadataTable.NAME)) { - mscanner.setRange(new KeyExtent(tableId, null, null).toMetaRange()); + mscanner.setRange(new KeyExtent(tableId, lastSplit, firstSplit).toMetaRange()); mscanner.fetchColumnFamily(BulkFileColumnFamily.NAME); for (Map.Entry<Key,Value> entry : mscanner) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java index df9564c805..7cf276dc81 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java @@ -64,7 +64,7 @@ public class CleanUpBulkImport extends ManagerRepo { ample.putGcFileAndDirCandidates(tableId, Collections.singleton(new ReferenceFile(tableId, bulkDir.toString()))); log.debug("removing the metadata table markers for loaded files"); - ample.removeBulkLoadEntries(tableId, tid); + ample.removeBulkLoadEntries(tableId, tid, null, null); log.debug("releasing HDFS reservations for " + source + " and " + error); Utils.unreserveHdfsDirectory(manager, source, tid); Utils.unreserveHdfsDirectory(manager, error, tid); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkInfo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkInfo.java index 15d79d4d8a..ff581c9dc2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkInfo.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkInfo.java @@ -34,4 +34,8 @@ class BulkInfo implements Serializable { String bulkDir; boolean setTime; TableState tableState; + // firstSplit and lastSplit describe the min and max splits in the table that overlap the bulk + // imported data + byte[] firstSplit; + byte[] lastSplit; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java index dffe0671e5..f681055513 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java @@ -33,6 +33,7 @@ import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +52,8 @@ public class CleanUpBulkImport extends ManagerRepo { @Override public Repo<Manager> call(long tid, Manager manager) throws Exception { manager.updateBulkImportStatus(info.sourceDir, BulkImportState.CLEANUP); - log.debug("removing the bulkDir processing flag file in " + info.bulkDir); + log.debug("{} removing the bulkDir processing flag file in {}", FateTxId.formatTid(tid), + info.bulkDir); Ample ample = manager.getContext().getAmple(); Path bulkDir = new Path(info.bulkDir); ample.removeBulkLoadInProgressFlag( @@ -59,8 +61,14 @@ public class CleanUpBulkImport extends ManagerRepo { ample.putGcFileAndDirCandidates(info.tableId, Collections.singleton(new ReferenceFile(info.tableId, bulkDir.toString()))); if (info.tableState == TableState.ONLINE) { - log.debug("removing the metadata table markers for loaded files"); - ample.removeBulkLoadEntries(info.tableId, tid); + + Text firstSplit = info.firstSplit == null ? null : new Text(info.firstSplit); + Text lastSplit = info.lastSplit == null ? null : new Text(info.lastSplit); + + log.debug("{} removing the metadata table markers for loaded files in range {} {}", + FateTxId.formatTid(tid), firstSplit, lastSplit); + + ample.removeBulkLoadEntries(info.tableId, tid, firstSplit, lastSplit); } Utils.unreserveHdfsDirectory(manager, info.sourceDir, tid); Utils.getReadLock(manager, info.tableId, tid).unlock(); @@ -71,7 +79,7 @@ public class CleanUpBulkImport extends ManagerRepo { manager.getVolumeManager().delete(renamingFile); manager.getVolumeManager().delete(mappingFile); } catch (IOException ioe) { - log.debug("Failed to delete renames and/or loadmap", ioe); + log.debug("{} Failed to delete renames and/or loadmap", FateTxId.formatTid(tid), ioe); } log.debug("completing bulkDir import transaction " + FateTxId.formatTid(tid)); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java index 629541a382..56116365b4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -41,6 +42,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; @@ -113,7 +115,7 @@ public class PrepBulkImport extends ManagerRepo { * file goes to too many tablets. */ @VisibleForTesting - static void sanityCheckLoadMapping(String tableId, LoadMappingIterator lmi, + static KeyExtent validateLoadMapping(String tableId, LoadMappingIterator lmi, TabletIterFactory tabletIterFactory, int maxNumTablets, long tid) throws Exception { var currRange = lmi.next(); @@ -126,6 +128,9 @@ public class PrepBulkImport extends ManagerRepo { var fileCounts = new HashMap<String,Integer>(); int count; + KeyExtent firstTablet = currRange.getKey(); + KeyExtent lastTablet = currRange.getKey(); + if (!tabletIter.hasNext() && equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) && equals(KeyExtent::endRow, currTablet, currRange.getKey())) { currRange = null; @@ -138,6 +143,7 @@ public class PrepBulkImport extends ManagerRepo { break; } currRange = lmi.next(); + lastTablet = currRange.getKey(); } while (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) @@ -146,6 +152,11 @@ public class PrepBulkImport extends ManagerRepo { } boolean matchedPrevRow = equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()); + + if (matchedPrevRow && firstTablet == null) { + firstTablet = currTablet; + } + count = matchedPrevRow ? 1 : 0; while (!equals(KeyExtent::endRow, currTablet, currRange.getKey()) && tabletIter.hasNext()) { @@ -179,9 +190,11 @@ public class PrepBulkImport extends ManagerRepo { + ") number of tablets: " + new TreeMap<>(fileCounts)); } } + + return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), firstTablet.prevEndRow()); } - private void checkForMerge(final long tid, final Manager manager) throws Exception { + private KeyExtent checkForMerge(final long tid, final Manager manager) throws Exception { VolumeManager fs = manager.getVolumeManager(); final Path bulkDir = new Path(bulkInfo.sourceDir); @@ -197,14 +210,23 @@ public class PrepBulkImport extends ManagerRepo { .overlapping(startRow, null).checkConsistency().fetch(PREV_ROW).build().stream() .map(TabletMetadata::getExtent).iterator(); - sanityCheckLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets, tid); + return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets, + tid); } } @Override public Repo<Manager> call(final long tid, final Manager manager) throws Exception { // now that table lock is acquired check that all splits in load mapping exists in table - checkForMerge(tid, manager); + KeyExtent tabletsRange = checkForMerge(tid, manager); + + bulkInfo.firstSplit = + Optional.ofNullable(tabletsRange.prevEndRow()).map(Text::getBytes).orElse(null); + bulkInfo.lastSplit = + Optional.ofNullable(tabletsRange.endRow()).map(Text::getBytes).orElse(null); + + log.trace("{} first split:{} last split:{}", FateTxId.formatTid(tid), tabletsRange.prevEndRow(), + tabletsRange.endRow()); bulkInfo.tableState = manager.getContext().getTableState(bulkInfo.tableId); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java index 0a98e0a8d7..62bbbd1402 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.manager.tableOps.bulkVer2; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -32,6 +33,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -111,8 +113,16 @@ public class PrepBulkImportTest { return tabletRanges.subList(start, tabletRanges.size()).iterator(); }; + var sortedExtents = loadRanges.keySet().stream().sorted().collect(Collectors.toList()); + String minPrevEndRow = + Optional.ofNullable(sortedExtents.get(0).prevEndRow()).map(Text::toString).orElse(null); + String maxPrevEndRow = Optional.ofNullable(sortedExtents.get(sortedExtents.size() - 1).endRow()) + .map(Text::toString).orElse(null); + try (LoadMappingIterator lmi = createLoadMappingIter(loadRanges)) { - PrepBulkImport.sanityCheckLoadMapping("1", lmi, tabletIterFactory, maxTablets, 10001); + var extent = + PrepBulkImport.validateLoadMapping("1", lmi, tabletIterFactory, maxTablets, 10001); + assertEquals(nke(minPrevEndRow, maxPrevEndRow), extent, loadRanges + " " + tabletRanges); } }