This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new e776715d87 fixes #473 avoids scanning entire table metadata for bulk 
import (#3336)
e776715d87 is described below

commit e776715d8763309cffa314d14d3c2c41584b6402
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue May 9 15:54:01 2023 -0400

    fixes #473 avoids scanning entire table metadata for bulk import (#3336)
---
 .../accumulo/core/metadata/schema/Ample.java       |  4 ++-
 .../accumulo/server/metadata/ServerAmpleImpl.java  |  4 +--
 .../tableOps/bulkVer1/CleanUpBulkImport.java       |  2 +-
 .../manager/tableOps/bulkVer2/BulkInfo.java        |  4 +++
 .../tableOps/bulkVer2/CleanUpBulkImport.java       | 16 +++++++++---
 .../manager/tableOps/bulkVer2/PrepBulkImport.java  | 30 +++++++++++++++++++---
 .../tableOps/bulkVer2/PrepBulkImportTest.java      | 12 ++++++++-
 7 files changed, 59 insertions(+), 13 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 7db46c77a0..8911c137f6 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -364,8 +364,10 @@ public interface Ample {
    *
    * @param tableId Table ID for transaction removals
    * @param tid Transaction ID to remove
+   * @param firstSplit non-inclusive table split point at which to start 
looking for load markers
+   * @param lastSplit inclusive tablet split point at which to stop looking 
for load markers
    */
-  default void removeBulkLoadEntries(TableId tableId, long tid) {
+  default void removeBulkLoadEntries(TableId tableId, long tid, Text 
firstSplit, Text lastSplit) {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index d23917f0a3..03cdb4666d 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -184,13 +184,13 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
   }
 
   @Override
-  public void removeBulkLoadEntries(TableId tableId, long tid) {
+  public void removeBulkLoadEntries(TableId tableId, long tid, Text 
firstSplit, Text lastSplit) {
     Preconditions.checkArgument(DataLevel.of(tableId) == DataLevel.USER);
     try (
         Scanner mscanner =
             new IsolatedScanner(context.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY));
         BatchWriter bw = context.createBatchWriter(MetadataTable.NAME)) {
-      mscanner.setRange(new KeyExtent(tableId, null, null).toMetaRange());
+      mscanner.setRange(new KeyExtent(tableId, lastSplit, 
firstSplit).toMetaRange());
       mscanner.fetchColumnFamily(BulkFileColumnFamily.NAME);
 
       for (Map.Entry<Key,Value> entry : mscanner) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java
index df9564c805..7cf276dc81 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java
@@ -64,7 +64,7 @@ public class CleanUpBulkImport extends ManagerRepo {
     ample.putGcFileAndDirCandidates(tableId,
         Collections.singleton(new ReferenceFile(tableId, bulkDir.toString())));
     log.debug("removing the metadata table markers for loaded files");
-    ample.removeBulkLoadEntries(tableId, tid);
+    ample.removeBulkLoadEntries(tableId, tid, null, null);
     log.debug("releasing HDFS reservations for " + source + " and " + error);
     Utils.unreserveHdfsDirectory(manager, source, tid);
     Utils.unreserveHdfsDirectory(manager, error, tid);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkInfo.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkInfo.java
index 15d79d4d8a..ff581c9dc2 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkInfo.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkInfo.java
@@ -34,4 +34,8 @@ class BulkInfo implements Serializable {
   String bulkDir;
   boolean setTime;
   TableState tableState;
+  // firstSplit and lastSplit describe the min and max splits in the table 
that overlap the bulk
+  // imported data
+  byte[] firstSplit;
+  byte[] lastSplit;
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
index dffe0671e5..f681055513 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.manager.tableOps.Utils;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +52,8 @@ public class CleanUpBulkImport extends ManagerRepo {
   @Override
   public Repo<Manager> call(long tid, Manager manager) throws Exception {
     manager.updateBulkImportStatus(info.sourceDir, BulkImportState.CLEANUP);
-    log.debug("removing the bulkDir processing flag file in " + info.bulkDir);
+    log.debug("{} removing the bulkDir processing flag file in {}", 
FateTxId.formatTid(tid),
+        info.bulkDir);
     Ample ample = manager.getContext().getAmple();
     Path bulkDir = new Path(info.bulkDir);
     ample.removeBulkLoadInProgressFlag(
@@ -59,8 +61,14 @@ public class CleanUpBulkImport extends ManagerRepo {
     ample.putGcFileAndDirCandidates(info.tableId,
         Collections.singleton(new ReferenceFile(info.tableId, 
bulkDir.toString())));
     if (info.tableState == TableState.ONLINE) {
-      log.debug("removing the metadata table markers for loaded files");
-      ample.removeBulkLoadEntries(info.tableId, tid);
+
+      Text firstSplit = info.firstSplit == null ? null : new 
Text(info.firstSplit);
+      Text lastSplit = info.lastSplit == null ? null : new 
Text(info.lastSplit);
+
+      log.debug("{} removing the metadata table markers for loaded files in 
range {} {}",
+          FateTxId.formatTid(tid), firstSplit, lastSplit);
+
+      ample.removeBulkLoadEntries(info.tableId, tid, firstSplit, lastSplit);
     }
     Utils.unreserveHdfsDirectory(manager, info.sourceDir, tid);
     Utils.getReadLock(manager, info.tableId, tid).unlock();
@@ -71,7 +79,7 @@ public class CleanUpBulkImport extends ManagerRepo {
       manager.getVolumeManager().delete(renamingFile);
       manager.getVolumeManager().delete(mappingFile);
     } catch (IOException ioe) {
-      log.debug("Failed to delete renames and/or loadmap", ioe);
+      log.debug("{} Failed to delete renames and/or loadmap", 
FateTxId.formatTid(tid), ioe);
     }
 
     log.debug("completing bulkDir import transaction " + 
FateTxId.formatTid(tid));
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
index 629541a382..56116365b4 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -41,6 +42,7 @@ import 
org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
@@ -113,7 +115,7 @@ public class PrepBulkImport extends ManagerRepo {
    * file goes to too many tablets.
    */
   @VisibleForTesting
-  static void sanityCheckLoadMapping(String tableId, LoadMappingIterator lmi,
+  static KeyExtent validateLoadMapping(String tableId, LoadMappingIterator lmi,
       TabletIterFactory tabletIterFactory, int maxNumTablets, long tid) throws 
Exception {
     var currRange = lmi.next();
 
@@ -126,6 +128,9 @@ public class PrepBulkImport extends ManagerRepo {
     var fileCounts = new HashMap<String,Integer>();
     int count;
 
+    KeyExtent firstTablet = currRange.getKey();
+    KeyExtent lastTablet = currRange.getKey();
+
     if (!tabletIter.hasNext() && equals(KeyExtent::prevEndRow, currTablet, 
currRange.getKey())
         && equals(KeyExtent::endRow, currTablet, currRange.getKey())) {
       currRange = null;
@@ -138,6 +143,7 @@ public class PrepBulkImport extends ManagerRepo {
           break;
         }
         currRange = lmi.next();
+        lastTablet = currRange.getKey();
       }
 
       while (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey())
@@ -146,6 +152,11 @@ public class PrepBulkImport extends ManagerRepo {
       }
 
       boolean matchedPrevRow = equals(KeyExtent::prevEndRow, currTablet, 
currRange.getKey());
+
+      if (matchedPrevRow && firstTablet == null) {
+        firstTablet = currTablet;
+      }
+
       count = matchedPrevRow ? 1 : 0;
 
       while (!equals(KeyExtent::endRow, currTablet, currRange.getKey()) && 
tabletIter.hasNext()) {
@@ -179,9 +190,11 @@ public class PrepBulkImport extends ManagerRepo {
                 + ") number of tablets: " + new TreeMap<>(fileCounts));
       }
     }
+
+    return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), 
firstTablet.prevEndRow());
   }
 
-  private void checkForMerge(final long tid, final Manager manager) throws 
Exception {
+  private KeyExtent checkForMerge(final long tid, final Manager manager) 
throws Exception {
 
     VolumeManager fs = manager.getVolumeManager();
     final Path bulkDir = new Path(bulkInfo.sourceDir);
@@ -197,14 +210,23 @@ public class PrepBulkImport extends ManagerRepo {
               .overlapping(startRow, 
null).checkConsistency().fetch(PREV_ROW).build().stream()
               .map(TabletMetadata::getExtent).iterator();
 
-      sanityCheckLoadMapping(bulkInfo.tableId.canonical(), lmi, 
tabletIterFactory, maxTablets, tid);
+      return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, 
tabletIterFactory, maxTablets,
+          tid);
     }
   }
 
   @Override
   public Repo<Manager> call(final long tid, final Manager manager) throws 
Exception {
     // now that table lock is acquired check that all splits in load mapping 
exists in table
-    checkForMerge(tid, manager);
+    KeyExtent tabletsRange = checkForMerge(tid, manager);
+
+    bulkInfo.firstSplit =
+        
Optional.ofNullable(tabletsRange.prevEndRow()).map(Text::getBytes).orElse(null);
+    bulkInfo.lastSplit =
+        
Optional.ofNullable(tabletsRange.endRow()).map(Text::getBytes).orElse(null);
+
+    log.trace("{} first split:{} last split:{}", FateTxId.formatTid(tid), 
tabletsRange.prevEndRow(),
+        tabletsRange.endRow());
 
     bulkInfo.tableState = manager.getContext().getTableState(bulkInfo.tableId);
 
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
index 0a98e0a8d7..62bbbd1402 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.manager.tableOps.bulkVer2;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -32,6 +33,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -111,8 +113,16 @@ public class PrepBulkImportTest {
       return tabletRanges.subList(start, tabletRanges.size()).iterator();
     };
 
+    var sortedExtents = 
loadRanges.keySet().stream().sorted().collect(Collectors.toList());
+    String minPrevEndRow =
+        
Optional.ofNullable(sortedExtents.get(0).prevEndRow()).map(Text::toString).orElse(null);
+    String maxPrevEndRow = 
Optional.ofNullable(sortedExtents.get(sortedExtents.size() - 1).endRow())
+        .map(Text::toString).orElse(null);
+
     try (LoadMappingIterator lmi = createLoadMappingIter(loadRanges)) {
-      PrepBulkImport.sanityCheckLoadMapping("1", lmi, tabletIterFactory, 
maxTablets, 10001);
+      var extent =
+          PrepBulkImport.validateLoadMapping("1", lmi, tabletIterFactory, 
maxTablets, 10001);
+      assertEquals(nke(minPrevEndRow, maxPrevEndRow), extent, loadRanges + " " 
+ tabletRanges);
     }
   }
 

Reply via email to