This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new dc2109ad9c Recreate TabletsMetadata iterator when file ranges are not 
contiguous (#5341)
dc2109ad9c is described below

commit dc2109ad9cb8c0b6e0f4ea05c696b0429c8289b9
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Mar 28 12:22:39 2025 -0400

    Recreate TabletsMetadata iterator when file ranges are not contiguous 
(#5341)
    
    In the Bulk Import v2 LoadFiles step a single TabletsMetadata
    object was used to map a tables tablets to a set of bulk import
    files. In the case where a small percentage of tablets were
    involved in the bulk import a majority of the tables tablets
    would still be evaluated. In the case where bulk imports were
    not importing into contiguous tablets the code would just
    iterate over the tables tablets until it found the next starting
    point.
    
    This change creates a new  property to determine how many
    TabletMetadata objects to evaluate before recreating the
    TabletsMetadata object at the new start position. When bulk
    importing into a set of contiguous tablets or into a majority
    of tablets in a table, the value of this property should likely
    be zero to leave this feature disabled. When importing into
    a non-contiguous set of tablets, then this property could be
    used to speed up the bulk import processing.
    
    This change also closes the TabletsMetadata objects which was not
    being done previously.
    
    Related to #5201
    
    
    
    Co-authored-by: Daniel Roberts <ddani...@gmail.com>
---
 .../org/apache/accumulo/core/conf/Property.java    |  12 ++
 .../apache/accumulo/core/util/PeekingIterator.java |  37 ++++
 .../accumulo/core/util/PeekingIteratorTest.java    |  88 ++++++++
 .../manager/tableOps/bulkVer2/LoadFiles.java       |  32 ++-
 .../manager/tableOps/bulkVer2/PrepBulkImport.java  | 154 ++++++++------
 .../manager/tableOps/bulkVer2/LoadFilesTest.java   |   2 +-
 .../tableOps/bulkVer2/PrepBulkImportTest.java      |  76 ++++---
 .../apache/accumulo/test/functional/BulkNewIT.java |  51 +++--
 .../test/functional/BulkNewMetadataSkipIT.java     | 221 +++++++++++++++++++++
 9 files changed, 548 insertions(+), 125 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index ff0cad4bab..39955d0998 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1220,6 +1220,18 @@ public enum Property {
       "The maximum number of tablets allowed for one bulk import file. Value 
of 0 is Unlimited. "
           + "This property is only enforced in the new bulk import API.",
       "2.1.0"),
+  TABLE_BULK_SKIP_THRESHOLD("table.bulk.metadata.skip.distance", "0", 
PropertyType.COUNT,
+      "When performing bulk v2 imports to a table, the Manager iterates over 
the tables metadata"
+          + " tablets sequentially. When importing files into a small table or 
into all or a majority"
+          + " of tablets of a large table then the tablet metadata information 
for most tablets will be needed."
+          + " However, when importing files into a small number of 
non-contiguous tablets in a large table, then"
+          + " the Manager will look at each tablets metadata when it could be 
skipped. The value of this"
+          + " property tells the Manager if, and when, it should set up a new 
scanner over the metadata"
+          + " table instead of just iterating over tablet metadata to find the 
matching tablet. Setting up"
+          + " a new scanner is analogous to performing a seek in an iterator, 
but it has a cost. A value of zero (default) disables"
+          + " this feature. A non-zero value enables this feature and the 
Manager will setup a new scanner"
+          + " when the tablet metadata distance is above the supplied value.",
+      "2.1.4"),
   TABLE_DURABILITY("table.durability", "sync", PropertyType.DURABILITY,
       "The durability used to write to the write-ahead log. Legal values are:"
           + " none, which skips the write-ahead log; log, which sends the data 
to the"
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java 
b/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java
index f410174e73..c7b85ec159 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java
@@ -19,6 +19,9 @@
 package org.apache.accumulo.core.util;
 
 import java.util.Iterator;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
 
 public class PeekingIterator<E> implements Iterator<E> {
 
@@ -91,4 +94,38 @@ public class PeekingIterator<E> implements Iterator<E> {
     }
     return top != null;
   }
+
+  /**
+   * Advances the underlying iterator looking for a match, inspecting up to 
{@code limit} elements
+   * from the iterator. If this method finds a match to the predicate, then it 
will return true and
+   * will be positioned before the matching element (peek() and next() will 
return the matching
+   * element). If this method does not find a match because the underlying 
iterator ended before
+   * {@code limit}, then it will return false and hasNext will also return 
false. Otherwise, if this
+   * method does not find a match, then it will return false and be positioned 
before the limit
+   * element (peek() and next() will return the {@code limit} element).
+   *
+   * @param predicate condition that we are looking for, parameter could be 
null, so the Predicate
+   *        implementation needs to handle this.
+   * @param limit number of times that we should look for a match, parameter 
must be a positive int
+   * @return true if an element matched the predicate or false otherwise. When 
true hasNext() will
+   *         return true and peek() and next() will return the matching 
element. When false
+   *         hasNext() may return false if the end has been reached, or 
hasNext() may return true in
+   *         which case peek() and next() will return the element {@code 
limit} positions ahead of
+   *         where this iterator was before this method was called.
+   */
+  public boolean findWithin(Predicate<E> predicate, int limit) {
+    Preconditions.checkArgument(limit > 0);
+    for (int i = 0; i < limit; i++) {
+      if (predicate.test(peek())) {
+        return true;
+      } else if (i < (limit - 1)) {
+        if (hasNext()) {
+          next();
+        } else {
+          return false;
+        }
+      }
+    }
+    return false;
+  }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/util/PeekingIteratorTest.java 
b/core/src/test/java/org/apache/accumulo/core/util/PeekingIteratorTest.java
new file mode 100644
index 0000000000..27bf76a725
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/util/PeekingIteratorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Iterator;
+import java.util.stream.IntStream;
+
+import org.junit.jupiter.api.Test;
+
+public class PeekingIteratorTest {
+
+  @Test
+  public void testPeek() {
+    Iterator<Integer> ints = IntStream.range(1, 11).iterator();
+    PeekingIterator<Integer> peek = new PeekingIterator<>(ints);
+
+    assertEquals(1, peek.peek());
+
+    for (int i = 1; i < 11; i++) {
+      assertTrue(peek.hasNext());
+      assertEquals(i, peek.next());
+      if (i == 10) {
+        assertNull(peek.peek());
+      } else {
+        assertEquals(i + 1, peek.peek());
+      }
+    }
+
+    assertFalse(peek.hasNext());
+    assertNull(peek.next());
+  }
+
+  @Test
+  public void testFind() {
+
+    Iterator<Integer> ints = IntStream.range(1, 11).iterator();
+    PeekingIterator<Integer> peek = new PeekingIterator<>(ints);
+
+    assertThrows(IllegalArgumentException.class, () -> peek.findWithin(e -> 
false, -1));
+    assertEquals(1, peek.peek());
+    assertTrue(peek.findWithin((x) -> x != null && x == 4, 5));
+    assertTrue(peek.hasNext());
+    assertEquals(4, peek.next());
+    assertEquals(5, peek.peek());
+
+    // Advance the iterator 2 times looking for 7.
+    // This will return false, but will advance
+    // twice leaving the iterator at 6.
+    assertFalse(peek.findWithin((x) -> x != null && x == 7, 2));
+
+    assertTrue(peek.hasNext());
+    assertEquals(6, peek.peek());
+    assertEquals(6, peek.next());
+
+    assertTrue(peek.findWithin((x) -> x != null && x == 8, 2));
+    assertTrue(peek.hasNext());
+    assertEquals(8, peek.next());
+
+    // Try to advance past the end
+    assertFalse(peek.findWithin((x) -> x != null && x == 7, 3));
+    assertFalse(peek.hasNext());
+    assertNull(peek.next());
+
+  }
+
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index dfa4b98278..abed9684aa 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -123,7 +123,9 @@ class LoadFiles extends ManagerRepo {
           .forTable(bulkInfo.tableId).overlapping(startRow, 
null).checkConsistency()
           .fetch(PREV_ROW, LOCATION, LOADED).build();
 
-      return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, manager, tid);
+      int skip = manager.getContext().getTableConfiguration(bulkInfo.tableId)
+          .getCount(Property.TABLE_BULK_SKIP_THRESHOLD);
+      return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, manager, tid, 
skip);
     }
   }
 
@@ -352,8 +354,8 @@ class LoadFiles extends ManagerRepo {
    */
   // visible for testing
   static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir,
-      LoadMappingIterator loadMapIter, TabletsMetadataFactory factory, Manager 
manager, long tid)
-      throws Exception {
+      LoadMappingIterator loadMapIter, TabletsMetadataFactory factory, Manager 
manager, long tid,
+      int skipDistance) throws Exception {
     PeekingIterator<Map.Entry<KeyExtent,Bulk.Files>> lmi = new 
PeekingIterator<>(loadMapIter);
     Map.Entry<KeyExtent,Bulk.Files> loadMapEntry = lmi.peek();
 
@@ -366,15 +368,33 @@ class LoadFiles extends ManagerRepo {
 
     ImportTimingStats importTimingStats = new ImportTimingStats();
     Timer timer = Timer.startNew();
-    try (TabletsMetadata tabletsMetadata = 
factory.newTabletsMetadata(startRow)) {
 
-      Iterator<TabletMetadata> tabletIter = tabletsMetadata.iterator();
+    TabletsMetadata tabletsMetadata = factory.newTabletsMetadata(startRow);
+    try {
+      PeekingIterator<TabletMetadata> pi = new 
PeekingIterator<>(tabletsMetadata.iterator());
       while (lmi.hasNext()) {
         loadMapEntry = lmi.next();
+        // If the user set the TABLE_BULK_SKIP_THRESHOLD property, then only 
look
+        // at the next skipDistance tablets before recreating the iterator
+        if (skipDistance > 0) {
+          final KeyExtent loadMapKey = loadMapEntry.getKey();
+          if (!pi.findWithin(
+              tm -> PREV_COMP.compare(tm.getPrevEndRow(), 
loadMapKey.prevEndRow()) >= 0,
+              skipDistance)) {
+            log.debug(
+                "Next load mapping range {} not found in {} tablets, 
recreating TabletMetadata to jump ahead",
+                loadMapKey.prevEndRow(), skipDistance);
+            tabletsMetadata.close();
+            tabletsMetadata = 
factory.newTabletsMetadata(loadMapKey.prevEndRow());
+            pi = new PeekingIterator<>(tabletsMetadata.iterator());
+          }
+        }
         List<TabletMetadata> tablets =
-            findOverlappingTablets(fmtTid, loadMapEntry.getKey(), tabletIter, 
importTimingStats);
+            findOverlappingTablets(fmtTid, loadMapEntry.getKey(), pi, 
importTimingStats);
         loader.load(tablets, loadMapEntry.getValue());
       }
+    } finally {
+      tabletsMetadata.close();
     }
     Duration totalProcessingTime = timer.elapsed();
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
index 56116365b4..a8689034c0 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.manager.tableOps.Utils;
@@ -104,6 +105,8 @@ public class PrepBulkImport extends ManagerRepo {
   @VisibleForTesting
   interface TabletIterFactory {
     Iterator<KeyExtent> newTabletIter(Text startRow);
+
+    void close();
   }
 
   private static boolean equals(Function<KeyExtent,Text> extractor, KeyExtent 
ke1, KeyExtent ke2) {
@@ -116,82 +119,100 @@ public class PrepBulkImport extends ManagerRepo {
    */
   @VisibleForTesting
   static KeyExtent validateLoadMapping(String tableId, LoadMappingIterator lmi,
-      TabletIterFactory tabletIterFactory, int maxNumTablets, long tid) throws 
Exception {
+      TabletIterFactory tabletIterFactory, int maxNumTablets, long tid, int 
skip) throws Exception {
+
     var currRange = lmi.next();
 
     Text startRow = currRange.getKey().prevEndRow();
 
-    Iterator<KeyExtent> tabletIter = tabletIterFactory.newTabletIter(startRow);
+    PeekingIterator<KeyExtent> pi =
+        new PeekingIterator<>(tabletIterFactory.newTabletIter(startRow));
 
-    KeyExtent currTablet = tabletIter.next();
+    try {
+      KeyExtent currTablet = pi.next();
 
-    var fileCounts = new HashMap<String,Integer>();
-    int count;
+      var fileCounts = new HashMap<String,Integer>();
+      int count;
 
-    KeyExtent firstTablet = currRange.getKey();
-    KeyExtent lastTablet = currRange.getKey();
+      KeyExtent firstTablet = currRange.getKey();
+      KeyExtent lastTablet = currRange.getKey();
 
-    if (!tabletIter.hasNext() && equals(KeyExtent::prevEndRow, currTablet, 
currRange.getKey())
-        && equals(KeyExtent::endRow, currTablet, currRange.getKey())) {
-      currRange = null;
-    }
+      if (!pi.hasNext() && equals(KeyExtent::prevEndRow, currTablet, 
currRange.getKey())
+          && equals(KeyExtent::endRow, currTablet, currRange.getKey())) {
+        currRange = null;
+      }
 
-    while (tabletIter.hasNext()) {
+      while (pi.hasNext()) {
 
-      if (currRange == null) {
-        if (!lmi.hasNext()) {
-          break;
+        if (currRange == null) {
+          if (!lmi.hasNext()) {
+            break;
+          }
+          currRange = lmi.next();
+          lastTablet = currRange.getKey();
+        }
+        // If the user set the TABLE_BULK_SKIP_THRESHOLD property, then only 
look
+        // at the next skipDistance tablets before recreating the iterator
+        if (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) && 
skip > 0
+            && currRange.getKey().prevEndRow() != null) {
+          final KeyExtent search = currRange.getKey();
+          if (!pi.findWithin((ke) -> Objects.equals(ke.prevEndRow(), 
search.prevEndRow()), skip)) {
+            log.debug(
+                "Tablet metadata for prevEndRow {} not found in {} tablets 
from current tablet {}, recreating TabletMetadata to jump ahead",
+                search.prevEndRow(), skip, currTablet);
+            tabletIterFactory.close();
+            pi = new 
PeekingIterator<>(tabletIterFactory.newTabletIter(search.prevEndRow()));
+            currTablet = pi.next();
+          }
+        }
+        while (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) 
&& pi.hasNext()) {
+          currTablet = pi.next();
         }
-        currRange = lmi.next();
-        lastTablet = currRange.getKey();
-      }
 
-      while (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey())
-          && tabletIter.hasNext()) {
-        currTablet = tabletIter.next();
-      }
+        boolean matchedPrevRow = equals(KeyExtent::prevEndRow, currTablet, 
currRange.getKey());
 
-      boolean matchedPrevRow = equals(KeyExtent::prevEndRow, currTablet, 
currRange.getKey());
+        if (matchedPrevRow && firstTablet == null) {
+          firstTablet = currTablet;
+        }
 
-      if (matchedPrevRow && firstTablet == null) {
-        firstTablet = currTablet;
-      }
+        count = matchedPrevRow ? 1 : 0;
 
-      count = matchedPrevRow ? 1 : 0;
+        while (!equals(KeyExtent::endRow, currTablet, currRange.getKey()) && 
pi.hasNext()) {
+          currTablet = pi.next();
+          count++;
+        }
 
-      while (!equals(KeyExtent::endRow, currTablet, currRange.getKey()) && 
tabletIter.hasNext()) {
-        currTablet = tabletIter.next();
-        count++;
-      }
+        if (!matchedPrevRow || !equals(KeyExtent::endRow, currTablet, 
currRange.getKey())) {
+          break;
+        }
 
-      if (!matchedPrevRow || !equals(KeyExtent::endRow, currTablet, 
currRange.getKey())) {
-        break;
+        if (maxNumTablets > 0) {
+          int fc = count;
+          currRange.getValue()
+              .forEach(fileInfo -> fileCounts.merge(fileInfo.getFileName(), 
fc, Integer::sum));
+        }
+        currRange = null;
       }
 
-      if (maxNumTablets > 0) {
-        int fc = count;
-        currRange.getValue()
-            .forEach(fileInfo -> fileCounts.merge(fileInfo.getFileName(), fc, 
Integer::sum));
+      if (currRange != null || lmi.hasNext()) {
+        // merge happened after the mapping was generated and before the table 
lock was acquired
+        throw new AcceptableThriftTableOperationException(tableId, null, 
TableOperation.BULK_IMPORT,
+            TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent 
merge happened");
       }
-      currRange = null;
-    }
-
-    if (currRange != null || lmi.hasNext()) {
-      // merge happened after the mapping was generated and before the table 
lock was acquired
-      throw new AcceptableThriftTableOperationException(tableId, null, 
TableOperation.BULK_IMPORT,
-          TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge 
happened");
-    }
 
-    if (maxNumTablets > 0) {
-      fileCounts.values().removeIf(c -> c <= maxNumTablets);
-      if (!fileCounts.isEmpty()) {
-        throw new AcceptableThriftTableOperationException(tableId, null, 
TableOperation.BULK_IMPORT,
-            TableOperationExceptionType.OTHER, "Files overlap the configured 
max (" + maxNumTablets
-                + ") number of tablets: " + new TreeMap<>(fileCounts));
+      if (maxNumTablets > 0) {
+        fileCounts.values().removeIf(c -> c <= maxNumTablets);
+        if (!fileCounts.isEmpty()) {
+          throw new AcceptableThriftTableOperationException(tableId, null,
+              TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER,
+              "Files overlap the configured max (" + maxNumTablets + ") number 
of tablets: "
+                  + new TreeMap<>(fileCounts));
+        }
       }
+      return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), 
firstTablet.prevEndRow());
+    } finally {
+      tabletIterFactory.close();
     }
-
-    return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), 
firstTablet.prevEndRow());
   }
 
   private KeyExtent checkForMerge(final long tid, final Manager manager) 
throws Exception {
@@ -205,13 +226,29 @@ public class PrepBulkImport extends ManagerRepo {
     try (LoadMappingIterator lmi =
         BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId, 
fs::open)) {
 
-      TabletIterFactory tabletIterFactory =
-          startRow -> 
TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId)
-              .overlapping(startRow, 
null).checkConsistency().fetch(PREV_ROW).build().stream()
-              .map(TabletMetadata::getExtent).iterator();
+      TabletIterFactory tabletIterFactory = new TabletIterFactory() {
+
+        TabletsMetadata tm = null;
+
+        @Override
+        public Iterator<KeyExtent> newTabletIter(Text startRow) {
+          tm = 
TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId)
+              .overlapping(startRow, 
null).checkConsistency().fetch(PREV_ROW).build();
+          return tm.stream().map(TabletMetadata::getExtent).iterator();
+        }
 
+        @Override
+        public void close() {
+          if (tm != null) {
+            tm.close();
+          }
+        }
+      };
+
+      int skip = manager.getContext().getTableConfiguration(bulkInfo.tableId)
+          .getCount(Property.TABLE_BULK_SKIP_THRESHOLD);
       return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, 
tabletIterFactory, maxTablets,
-          tid);
+          tid, skip);
     }
   }
 
@@ -224,7 +261,6 @@ public class PrepBulkImport extends ManagerRepo {
         
Optional.ofNullable(tabletsRange.prevEndRow()).map(Text::getBytes).orElse(null);
     bulkInfo.lastSplit =
         
Optional.ofNullable(tabletsRange.endRow()).map(Text::getBytes).orElse(null);
-
     log.trace("{} first split:{} last split:{}", FateTxId.formatTid(tid), 
tabletsRange.prevEndRow(),
         tabletsRange.endRow());
 
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
index 9afa0ceb57..31d8cfd828 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
@@ -162,7 +162,7 @@ public class LoadFilesTest {
     Path bulkDir = EasyMock.createMock(Path.class);
     EasyMock.replay(manager, bulkDir);
 
-    LoadFiles.loadFiles(cl, info, bulkDir, lmi, tmf, manager, txid);
+    LoadFiles.loadFiles(cl, info, bulkDir, lmi, tmf, manager, txid, 0);
     EasyMock.verify(manager, bulkDir);
     List<CaptureLoader.LoadResult> results = cl.getLoadResults();
     assertEquals(loadRanges.size(), results.size());
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
index 9ff5945e21..5fb2d8181f 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
@@ -31,6 +31,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -88,29 +89,37 @@ public class PrepBulkImportTest {
     }).iterator();
   }
 
-  private void runTest(List<KeyExtent> loadRanges, List<KeyExtent> 
tabletRanges) throws Exception {
+  private void runTest(List<KeyExtent> loadRanges, List<KeyExtent> 
tabletRanges, int skipDistance)
+      throws Exception {
     Map<KeyExtent,String> lrm = new HashMap<>();
     loadRanges.forEach(e -> lrm.put(e, "f1 f2 f3"));
-    runTest(lrm, tabletRanges, 100);
+    runTest(lrm, tabletRanges, 100, skipDistance);
   }
 
   public void runTest(Map<KeyExtent,String> loadRanges, List<KeyExtent> 
tabletRanges,
-      int maxTablets) throws Exception {
-    TabletIterFactory tabletIterFactory = startRow -> {
-      int start = -1;
-
-      if (startRow == null) {
-        start = 0;
-      } else {
-        for (int i = 0; i < tabletRanges.size(); i++) {
-          if (tabletRanges.get(i).contains(startRow)) {
-            start = i;
-            break;
+      int maxTablets, int skipDistance) throws Exception {
+    TabletIterFactory tabletIterFactory = new TabletIterFactory() {
+
+      @Override
+      public Iterator<KeyExtent> newTabletIter(Text startRow) {
+        int start = -1;
+
+        if (startRow == null) {
+          start = 0;
+        } else {
+          for (int i = 0; i < tabletRanges.size(); i++) {
+            if (tabletRanges.get(i).contains(startRow)) {
+              start = i;
+              break;
+            }
           }
         }
+
+        return tabletRanges.subList(start, tabletRanges.size()).iterator();
       }
 
-      return tabletRanges.subList(start, tabletRanges.size()).iterator();
+      @Override
+      public void close() {}
     };
 
     var sortedExtents = 
loadRanges.keySet().stream().sorted().collect(Collectors.toList());
@@ -120,8 +129,8 @@ public class PrepBulkImportTest {
         .map(Text::toString).orElse(null);
 
     try (LoadMappingIterator lmi = createLoadMappingIter(loadRanges)) {
-      var extent =
-          PrepBulkImport.validateLoadMapping("1", lmi, tabletIterFactory, 
maxTablets, 10001);
+      var extent = PrepBulkImport.validateLoadMapping("1", lmi, 
tabletIterFactory, maxTablets,
+          10001, skipDistance);
       assertEquals(nke(minPrevEndRow, maxPrevEndRow), extent, loadRanges + " " 
+ tabletRanges);
     }
   }
@@ -154,23 +163,24 @@ public class PrepBulkImportTest {
         .collect(Collectors.joining(","));
   }
 
-  public void runExceptionTest(List<KeyExtent> loadRanges, List<KeyExtent> 
tabletRanges) {
+  public void runExceptionTest(List<KeyExtent> loadRanges, List<KeyExtent> 
tabletRanges,
+      int skipDistance) {
     String message = "expected " + toRangeStrings(loadRanges) + " to fail 
against "
         + toRangeStrings(tabletRanges);
     assertThrows(AcceptableThriftTableOperationException.class,
-        () -> runTest(loadRanges, tabletRanges), message);
+        () -> runTest(loadRanges, tabletRanges, skipDistance), message);
   }
 
   @Test
   public void testSingleTablet() throws Exception {
-    runTest(Arrays.asList(nke(null, null)), Arrays.asList(nke(null, null)));
+    runTest(Arrays.asList(nke(null, null)), Arrays.asList(nke(null, null)), 0);
 
     for (List<KeyExtent> loadRanges : powerSet(nke(null, "b"), nke("b", "k"), 
nke("k", "r"),
         nke("r", null))) {
       if (loadRanges.isEmpty()) {
         continue;
       }
-      runExceptionTest(loadRanges, Arrays.asList(nke(null, null)));
+      runExceptionTest(loadRanges, Arrays.asList(nke(null, null)), 0);
     }
   }
 
@@ -186,8 +196,8 @@ public class PrepBulkImportTest {
 
       List<String> requiredRows = List.of("b", "m", "r", "v");
       for (Set<String> otherRows : Sets.powerSet(Set.of("a", "c", "q", "t", 
"x"))) {
-        runTest(loadRanges,
-            createExtents(Stream.concat(requiredRows.stream(), 
otherRows.stream())));
+        runTest(loadRanges, createExtents(Stream.concat(requiredRows.stream(), 
otherRows.stream())),
+            0);
       }
     }
   }
@@ -217,14 +227,14 @@ public class PrepBulkImportTest {
         // test will all but one of the rows in the load mapping
         for (Set<String> otherRows : Sets.powerSet(Set.of("a", "c", "q", "t", 
"x"))) {
           runExceptionTest(loadRanges,
-              createExtents(Stream.concat(rows2.stream(), 
otherRows.stream())));
+              createExtents(Stream.concat(rows2.stream(), 
otherRows.stream())), 0);
         }
       }
 
       if (rows.size() > 1) {
         // test with none of the rows in the load mapping
         for (Set<String> otherRows : Sets.powerSet(Set.of("a", "c", "q", "t", 
"x"))) {
-          runExceptionTest(loadRanges, createExtents(otherRows.stream()));
+          runExceptionTest(loadRanges, createExtents(otherRows.stream()), 0);
         }
       }
     }
@@ -250,14 +260,14 @@ public class PrepBulkImportTest {
         int totalTablets = requiredRows.size() + otherRows.size() + 1;
 
         if (totalTablets > maxTablets) {
-          runTooManyTest(loadRanges, tablets, "{f2=" + totalTablets + "}", 
maxTablets);
+          runTooManyTest(loadRanges, tablets, "{f2=" + totalTablets + "}", 
maxTablets, 2);
         } else {
-          runTest(loadRanges, createExtents(tablets), maxTablets);
+          runTest(loadRanges, createExtents(tablets), maxTablets, 2);
         }
       }
 
       runTest(loadRanges, createExtents(Stream.concat(requiredRows.stream(), 
otherRows.stream())),
-          0);
+          0, 2);
     }
 
     loadRanges.clear();
@@ -267,20 +277,20 @@ public class PrepBulkImportTest {
     loadRanges.put(nke("re", "rz"), "f4");
 
     runTooManyTest(loadRanges, Stream.of("ca", "cd", "cz", "e", "ma", "md", 
"mm", "re", "rz"),
-        "{f3=4}", 3);
+        "{f3=4}", 3, 2);
     runTooManyTest(loadRanges, Stream.of("b", "ca", "cd", "cz", "e", "ma", 
"md", "mm", "re", "rz"),
-        "{f3=4}", 3);
+        "{f3=4}", 3, 2);
     runTooManyTest(loadRanges,
         Stream.of("ca", "cd", "cz", "e", "ma", "md", "mm", "re", "rf", "rh", 
"rm", "rz"),
-        "{f3=4, f4=4}", 3);
+        "{f3=4, f4=4}", 3, 2);
     runTooManyTest(loadRanges,
-        Stream.of("ca", "cd", "cz", "e", "ma", "mm", "re", "rf", "rh", "rm", 
"rz"), "{f4=4}", 3);
+        Stream.of("ca", "cd", "cz", "e", "ma", "mm", "re", "rf", "rh", "rm", 
"rz"), "{f4=4}", 3, 2);
   }
 
   private void runTooManyTest(Map<KeyExtent,String> loadRanges, Stream<String> 
tablets,
-      String expectedMessage, int maxTablets) {
+      String expectedMessage, int maxTablets, int skipDistance) {
     var exception = assertThrows(ThriftTableOperationException.class,
-        () -> runTest(loadRanges, createExtents(tablets), maxTablets));
+        () -> runTest(loadRanges, createExtents(tablets), maxTablets, 
skipDistance));
     String message = exception.toString();
     assertTrue(exception.toString().contains(expectedMessage), expectedMessage 
+ " -- " + message);
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index 683461b8c3..1cb6bbf953 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -169,7 +169,7 @@ public class BulkNewIT extends SharedMiniClusterBase {
 
     String dir = getDir("/testSingleTabletSingleFileNoSplits-");
 
-    String h1 = writeData(dir + "/f1.", aconf, 0, 332);
+    String h1 = writeData(fs, dir + "/f1.", aconf, 0, 332);
 
     
c.tableOperations().importDirectory(dir).to(tableName).tableTime(setTime).load();
     // running again with ignoreEmptyDir set to true will not throw an 
exception
@@ -258,7 +258,7 @@ public class BulkNewIT extends SharedMiniClusterBase {
 
     String dir = getDir("/testSingleTabletSingleFileNoSplits-");
 
-    String h1 = writeData(dir + "/f1.", aconf, 0, 333);
+    String h1 = writeData(fs, dir + "/f1.", aconf, 0, 333);
 
     c.tableOperations().importDirectory(dir).to(tableName).load();
 
@@ -291,7 +291,7 @@ public class BulkNewIT extends SharedMiniClusterBase {
 
       String dir = getDir("/testBadPermissions-");
 
-      writeData(dir + "/f1.", aconf, 0, 333);
+      writeData(fs, dir + "/f1.", aconf, 0, 333);
 
       Path rFilePath = new Path(dir, "f1." + RFile.EXTENSION);
       FsPermission originalPerms = fs.getFileStatus(rFilePath).getPermission();
@@ -339,21 +339,21 @@ public class BulkNewIT extends SharedMiniClusterBase {
       out.close();
 
       // 1 Tablet 0333-null
-      String h1 = writeData(dir + "/f1.", aconf, 0, 333);
+      String h1 = writeData(fs, dir + "/f1.", aconf, 0, 333);
       hashes.get("0333").add(h1);
 
       // 2 Tablets 0666-0334, 0999-0667
-      String h2 = writeData(dir + "/f2.", aconf, 334, 999);
+      String h2 = writeData(fs, dir + "/f2.", aconf, 334, 999);
       hashes.get("0666").add(h2);
       hashes.get("0999").add(h2);
 
       // 2 Tablets 1333-1000, 1666-1334
-      String h3 = writeData(dir + "/f3.", aconf, 1000, 1499);
+      String h3 = writeData(fs, dir + "/f3.", aconf, 1000, 1499);
       hashes.get("1333").add(h3);
       hashes.get("1666").add(h3);
 
       // 2 Tablets 1666-1334, >1666
-      String h4 = writeData(dir + "/f4.", aconf, 1500, 1999);
+      String h4 = writeData(fs, dir + "/f4.", aconf, 1500, 1999);
       hashes.get("1666").add(h4);
       hashes.get("null").add(h4);
 
@@ -393,21 +393,21 @@ public class BulkNewIT extends SharedMiniClusterBase {
       out.close();
 
       // 1 Tablet 0333-null
-      String h1 = writeData(dir + "/f1.", aconf, 0, 333);
+      String h1 = writeData(fs, dir + "/f1.", aconf, 0, 333);
       hashes.get("0333").add(h1);
 
       // 3 Tablets 0666-0334, 0999-0667, 1333-1000
-      String h2 = writeData(dir + "/bad-file.", aconf, 334, 1333);
+      String h2 = writeData(fs, dir + "/bad-file.", aconf, 334, 1333);
       hashes.get("0666").add(h2);
       hashes.get("0999").add(h2);
       hashes.get("1333").add(h2);
 
       // 1 Tablet 1666-1334
-      String h3 = writeData(dir + "/f3.", aconf, 1334, 1499);
+      String h3 = writeData(fs, dir + "/f3.", aconf, 1334, 1499);
       hashes.get("1666").add(h3);
 
       // 2 Tablets 1666-1334, >1666
-      String h4 = writeData(dir + "/f4.", aconf, 1500, 1999);
+      String h4 = writeData(fs, dir + "/f4.", aconf, 1500, 1999);
       hashes.get("1666").add(h4);
       hashes.get("null").add(h4);
 
@@ -453,8 +453,8 @@ public class BulkNewIT extends SharedMiniClusterBase {
 
       String dir = getDir("/testBulkFile-");
 
-      writeData(dir + "/f1.", aconf, 0, 333);
-      writeData(dir + "/f2.", aconf, 0, 666);
+      writeData(fs, dir + "/f1.", aconf, 0, 333);
+      writeData(fs, dir + "/f2.", aconf, 0, 666);
 
       final var importMappingOptions = 
c.tableOperations().importDirectory(dir).to(tableName);
 
@@ -487,12 +487,12 @@ public class BulkNewIT extends SharedMiniClusterBase {
       String dir = getDir("/testBulkFile-");
 
       Map<String,Set<String>> hashes = new HashMap<>();
-      String h1 = writeData(dir + "/f1.", aconf, 0, 333);
+      String h1 = writeData(fs, dir + "/f1.", aconf, 0, 333);
       hashes.put("0333", new HashSet<>(List.of(h1)));
-      String h2 = writeData(dir + "/f2.", aconf, 0, 666);
+      String h2 = writeData(fs, dir + "/f2.", aconf, 0, 666);
       hashes.get("0333").add(h2);
       hashes.put("0666", new HashSet<>(List.of(h2)));
-      String h3 = writeData(dir + "/f3.", aconf, 334, 700);
+      String h3 = writeData(fs, dir + "/f3.", aconf, 334, 700);
       hashes.get("0666").add(h3);
       hashes.put("0999", new HashSet<>(List.of(h3)));
       hashes.put("1333", Set.of());
@@ -571,7 +571,7 @@ public class BulkNewIT extends SharedMiniClusterBase {
 
       addSplits(c, tableName, "0333");
 
-      var h1 = writeData(dir + "/f1.", aconf, 333, 333);
+      var h1 = writeData(fs, dir + "/f1.", aconf, 333, 333);
 
       c.tableOperations().importDirectory(dir).to(tableName).load();
 
@@ -593,7 +593,7 @@ public class BulkNewIT extends SharedMiniClusterBase {
 
       String dir = getDir("/testExceptionInMetadataUpdate-");
 
-      String h1 = writeData(dir + "/f1.", aconf, 0, 333);
+      String h1 = writeData(fs, dir + "/f1.", aconf, 0, 333);
 
       var executor = Executors.newSingleThreadExecutor();
       // With the constraint configured that makes tservers throw an exception 
on bulk import, the
@@ -630,8 +630,8 @@ public class BulkNewIT extends SharedMiniClusterBase {
     client.tableOperations().addSplits(tableName, splits);
   }
 
-  private void verifyData(AccumuloClient client, String table, int start, int 
end, boolean setTime)
-      throws Exception {
+  private static void verifyData(AccumuloClient client, String table, int 
start, int end,
+      boolean setTime) throws Exception {
     try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
 
       Iterator<Entry<Key,Value>> iter = scanner.iterator();
@@ -664,7 +664,7 @@ public class BulkNewIT extends SharedMiniClusterBase {
     }
   }
 
-  private void verifyMetadata(AccumuloClient client, String tableName,
+  public static void verifyMetadata(AccumuloClient client, String tableName,
       Map<String,Set<String>> expectedHashes) {
 
     Set<String> endRowsSeen = new HashSet<>();
@@ -691,7 +691,7 @@ public class BulkNewIT extends SharedMiniClusterBase {
 
   @SuppressFBWarnings(value = {"PATH_TRAVERSAL_IN", 
"WEAK_MESSAGE_DIGEST_SHA1"},
       justification = "path provided by test; sha-1 is okay for test")
-  private String hash(String filename) {
+  public static String hash(String filename) {
     try {
       byte[] data = 
Files.readAllBytes(Paths.get(filename.replaceFirst("^file:", "")));
       byte[] hash = MessageDigest.getInstance("SHA1").digest(data);
@@ -701,13 +701,12 @@ public class BulkNewIT extends SharedMiniClusterBase {
     }
   }
 
-  private static String row(int r) {
+  public static String row(int r) {
     return String.format("%04d", r);
   }
 
-  private String writeData(String file, AccumuloConfiguration aconf, int s, 
int e)
-      throws Exception {
-    FileSystem fs = getCluster().getFileSystem();
+  public static String writeData(FileSystem fs, String file, 
AccumuloConfiguration aconf, int s,
+      int e) throws Exception {
     String filename = file + RFile.EXTENSION;
     try (FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder()
         .forFile(filename, fs, fs.getConf(), NoCryptoServiceFactory.NONE)
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewMetadataSkipIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewMetadataSkipIT.java
new file mode 100644
index 0000000000..8f286d8f7c
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewMetadataSkipIT.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.test.functional.BulkNewIT.hash;
+import static org.apache.accumulo.test.functional.BulkNewIT.row;
+import static org.apache.accumulo.test.functional.BulkNewIT.verifyMetadata;
+import static org.apache.accumulo.test.functional.BulkNewIT.writeData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
+import org.apache.accumulo.core.data.LoadPlan.RangeType;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * This test creates a table with 1000 splits and then imports files into a 
sparse set of the
+ * tablets. This test also splits the metadata table such that the tablet 
metadata for each tablet
+ * of the test table is in its own metadata tablet. The test then runs with 
different values for the
+ * TABLE_BULK_SKIP_THRESHOLD property starting with zero (disabled) then 
increasing.
+ *
+ * This test uses AccumuloClusterHarness instead of SharedMiniClusterBase so 
that we don't have to
+ * re-merge the metadata table and delete the test table. Doing these two 
things, and then waiting
+ * for balancing, takes a long time. It's faster to just start with a clean 
instance for each test
+ * run.
+ */
+public class BulkNewMetadataSkipIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setMemory(ServerType.TABLET_SERVER, 512, MemoryUnit.MEGABYTE);
+    cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "3s");
+    cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "25");
+    cfg.setNumTservers(1);
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  public static String writeNonContiguousData(FileSystem fs, String file,
+      AccumuloConfiguration aconf, int[] rows) throws Exception {
+    String filename = file + RFile.EXTENSION;
+    try (FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder()
+        .forFile(filename, fs, fs.getConf(), NoCryptoServiceFactory.NONE)
+        .withTableConfiguration(aconf).build()) {
+      writer.startDefaultLocalityGroup();
+      for (int i : rows) {
+        writer.append(new Key(new Text(row(i))), new 
Value(Integer.toString(i)));
+      }
+    }
+    return hash(filename);
+  }
+
+  @BeforeEach
+  @Override
+  public void setupCluster() throws Exception {
+    super.setupCluster();
+    // prime the zk connection
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {}
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {0, 0, 2, 4, 8, 16, 32, 64, 128})
+  public void test(int skipDistance) throws Exception {
+
+    final String tableName = getUniqueNames(1)[0] + "_" + skipDistance;
+    final AccumuloConfiguration aconf = 
getCluster().getServerContext().getConfiguration();
+    final FileSystem fs = getCluster().getFileSystem();
+    final String rootPath = getCluster().getTemporaryPath().toString();
+    final String dir = rootPath + "/" + tableName;
+
+    fs.delete(new Path(dir), true);
+
+    final SortedSet<Text> splits = new TreeSet<>();
+    IntStream.rangeClosed(0, 1000).forEach(i -> splits.add(new 
Text(String.format("%04d", i))));
+
+    final NewTableConfiguration ntc = new NewTableConfiguration();
+    ntc.setProperties(
+        Map.of(Property.TABLE_BULK_SKIP_THRESHOLD.getKey(), 
Integer.toString(skipDistance)));
+    ntc.withSplits(splits);
+
+    final Map<String,Set<String>> hashes = new HashMap<>();
+    IntStream.rangeClosed(0, 1000).forEach(i -> hashes.put(row(i), new 
HashSet<>()));
+    hashes.put("null", new HashSet<>());
+
+    String h1 = writeData(fs, dir + "/f1.", aconf, 0, 11);
+    IntStream.rangeClosed(0, 11).forEach(i -> hashes.get(row(i)).add(h1));
+
+    int[] h2Rows = new int[] {11, 199, 200, 204};
+    String h2 = writeNonContiguousData(fs, dir + "/f2.", aconf, h2Rows);
+    for (int i : h2Rows) {
+      hashes.get(row(i)).add(h2);
+    }
+
+    int[] h3Rows = new int[] {13, 200, 272, 273};
+    String h3 = writeNonContiguousData(fs, dir + "/f3.", aconf, h3Rows);
+    for (int i : h3Rows) {
+      hashes.get(row(i)).add(h3);
+    }
+
+    int[] h4Rows = new int[] {300, 301, 672, 998};
+    String h4 = writeNonContiguousData(fs, dir + "/f4.", aconf, h4Rows);
+    for (int i : h4Rows) {
+      hashes.get(row(i)).add(h4);
+    }
+
+    final LoadPlan loadPlan =
+        LoadPlan.builder().loadFileTo("f1.rf", RangeType.FILE, row(0), row(11))
+            .loadFileTo("f2.rf", RangeType.TABLE, row(10), row(11))
+            .loadFileTo("f2.rf", RangeType.FILE, row(199), row(200))
+            .loadFileTo("f2.rf", RangeType.TABLE, row(203), row(204))
+            .loadFileTo("f3.rf", RangeType.TABLE, row(12), row(13))
+            .loadFileTo("f3.rf", RangeType.TABLE, row(199), row(200))
+            .loadFileTo("f3.rf", RangeType.FILE, row(272), row(273))
+            .loadFileTo("f4.rf", RangeType.FILE, row(300), row(301))
+            .loadFileTo("f4.rf", RangeType.TABLE, row(671), row(672))
+            .loadFileTo("f4.rf", RangeType.TABLE, row(997), row(998)).build();
+
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      c.tableOperations().create(tableName, ntc);
+      TableId tid = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+      final SortedSet<Text> metadataSplits = new TreeSet<>();
+      Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+      final String mdTablePrefix = tid.canonical() + ";";
+      s.forEach(e -> {
+        final String row = e.getKey().getRow().toString();
+        if (row.startsWith(mdTablePrefix)) {
+          metadataSplits.add(new Text(row + "\\x00"));
+        }
+      });
+      c.tableOperations().addSplits(MetadataTable.NAME, metadataSplits);
+
+      
c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();
+
+      verifyData(c, tableName, new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
11, 13, 199, 200, 204,
+          272, 273, 300, 301, 672, 998}, false);
+      verifyMetadata(c, tableName, hashes);
+    }
+  }
+
+  public static void verifyData(AccumuloClient client, String table, int[] 
expectedRows,
+      boolean setTime) throws Exception {
+    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
+
+      Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+      int count = 0;
+      while (iter.hasNext()) {
+        Entry<Key,Value> entry = iter.next();
+
+        String expectedRow = String.format("%04d", expectedRows[count]);
+
+        if (!entry.getKey().getRow().equals(new Text(expectedRow))) {
+          throw new Exception("unexpected row " + entry.getKey() + " " + 
expectedRow);
+        }
+
+        if (Integer.parseInt(entry.getValue().toString()) != 
expectedRows[count]) {
+          throw new Exception("unexpected value " + entry + " " + 
expectedRows[count]);
+        }
+
+        if (setTime) {
+          assertEquals(1L, entry.getKey().getTimestamp());
+        }
+
+        count++;
+      }
+    }
+  }
+
+}


Reply via email to