This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new cfae5e9abc fixes slow bulk import with many tablets and file (#5044)
cfae5e9abc is described below

commit cfae5e9abc7447aacc7136777f853fc0842c49de
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Nov 8 11:44:10 2024 -0500

    fixes slow bulk import with many tablets and file (#5044)
    
    * fixes slow bulk import with many tablets and file
    
    The bulk import code was reading all tablets in the bulk import range
    for each range being bulk imported. This resulted in O(N^2) metadata
    table scans which made really large bulk imports really slow.
    
    Added a new test that bulk imports thousands of files into thousands of
    tablets.  Running this test w/o the fixes in this PR the following time
    is seen for the fate step.
    
    ```
    DEBUG: Running LoadFiles.isReady() 
FATE:USER:6320e73d-e661-4c66-bf25-c0c27a0a79d5 took 289521 ms and returned 0
    ```
    
    With this fix in this PR seeing the following times for the new test,
    so goes from 290s to 1.2s.
    
    ```
    DEBUG: Running LoadFiles.isReady() 
FATE:USER:18e52fc2-5876-4b01-ba7b-3b3c099a82be took 1225 ms and returned 0
    ```
    
    This bug does not seem to exists in 2.1 or 3.1.  Did not run the test
    though, may be worthwhile to backport the test.
---
 .../manager/tableOps/bulkVer2/LoadFiles.java       |  8 +++-
 .../apache/accumulo/test/functional/BulkNewIT.java | 56 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 2 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index fa657eb9d8..711387955d 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -338,11 +338,15 @@ class LoadFiles extends ManagerRepo {
         
TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow,
 null)
             .checkConsistency().fetch(PREV_ROW, LOCATION, LOADED, 
TIME).build()) {
 
+      // The tablet iterator and load mapping iterator are both iterating over 
data that is sorted
+      // in the same way. The two iterators are each independently advanced to 
find common points in
+      // the sorted data.
+      var tabletIter = tabletsMetadata.iterator();
+
       t1 = System.currentTimeMillis();
       while (lmi.hasNext()) {
         loadMapEntry = lmi.next();
-        List<TabletMetadata> tablets =
-            findOverlappingTablets(loadMapEntry.getKey(), 
tabletsMetadata.iterator());
+        List<TabletMetadata> tablets = 
findOverlappingTablets(loadMapEntry.getKey(), tabletIter);
         loader.load(tablets, loadMapEntry.getValue());
       }
     }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index df256d2499..1a3439919a 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -35,7 +35,9 @@ import java.nio.file.Paths;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -49,6 +51,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -829,6 +832,59 @@ public class BulkNewIT extends SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testManyTabletAndFiles() throws Exception {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      String dir = getDir("/testBulkFile-");
+      FileSystem fs = getCluster().getFileSystem();
+      fs.mkdirs(new Path(dir));
+
+      TreeSet<Text> splits = IntStream.range(1, 
9000).mapToObj(BulkNewIT::row).map(Text::new)
+          .collect(Collectors.toCollection(TreeSet::new));
+      c.tableOperations().addSplits(tableName, splits);
+
+      var executor = Executors.newFixedThreadPool(16);
+      var futures = new ArrayList<Future<?>>();
+
+      var loadPlanBuilder = LoadPlan.builder();
+      var rowsExpected = new HashSet<>();
+      var imports = IntStream.range(2, 
8999).boxed().collect(Collectors.toList());
+      // The order in which imports are added to the load plan should not 
matter so test that.
+      Collections.shuffle(imports);
+      for (var data : imports) {
+        String filename = "f" + data + ".";
+        loadPlanBuilder.loadFileTo(filename + RFile.EXTENSION, 
RangeType.TABLE, row(data - 1),
+            row(data));
+        var future = executor.submit(() -> {
+          writeData(dir + "/" + filename, aconf, data, data);
+          return null;
+        });
+        futures.add(future);
+        rowsExpected.add(row(data));
+      }
+
+      for (var future : futures) {
+        future.get();
+      }
+
+      executor.shutdown();
+
+      var loadPlan = loadPlanBuilder.build();
+
+      
c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();
+
+      // using a batch scanner can read from lots of tablets w/ less RPCs
+      try (var scanner = c.createBatchScanner(tableName)) {
+        // use a scan server so that tablets do not need to be hosted
+        scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
+        scanner.setRanges(List.of(new Range()));
+        var rowsSeen = scanner.stream().map(e -> 
e.getKey().getRowData().toString())
+            .collect(Collectors.toSet());
+        assertEquals(rowsExpected, rowsSeen);
+      }
+    }
+  }
+
   /**
    * @return Map w/ keys that are end rows of tablets and the value is a true 
when the tablet has a
    *         current location.

Reply via email to