This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new cfae5e9abc fixes slow bulk import with many tablets and file (#5044) cfae5e9abc is described below commit cfae5e9abc7447aacc7136777f853fc0842c49de Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Nov 8 11:44:10 2024 -0500 fixes slow bulk import with many tablets and file (#5044) * fixes slow bulk import with many tablets and file The bulk import code was reading all tablets in the bulk import range for each range being bulk imported. This resulted in O(N^2) metadata table scans which made really large bulk imports really slow. Added a new test that bulk imports thousands of files into thousands of tablets. Running this test w/o the fixes in this PR the following time is seen for the fate step. ``` DEBUG: Running LoadFiles.isReady() FATE:USER:6320e73d-e661-4c66-bf25-c0c27a0a79d5 took 289521 ms and returned 0 ``` With this fix in this PR seeing the following times for the new test, so goes from 290s to 1.2s. ``` DEBUG: Running LoadFiles.isReady() FATE:USER:18e52fc2-5876-4b01-ba7b-3b3c099a82be took 1225 ms and returned 0 ``` This bug does not seem to exists in 2.1 or 3.1. Did not run the test though, may be worthwhile to backport the test. --- .../manager/tableOps/bulkVer2/LoadFiles.java | 8 +++- .../apache/accumulo/test/functional/BulkNewIT.java | 56 ++++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index fa657eb9d8..711387955d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -338,11 +338,15 @@ class LoadFiles extends ManagerRepo { TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow, null) .checkConsistency().fetch(PREV_ROW, LOCATION, LOADED, TIME).build()) { + // The tablet iterator and load mapping iterator are both iterating over data that is sorted + // in the same way. The two iterators are each independently advanced to find common points in + // the sorted data. + var tabletIter = tabletsMetadata.iterator(); + t1 = System.currentTimeMillis(); while (lmi.hasNext()) { loadMapEntry = lmi.next(); - List<TabletMetadata> tablets = - findOverlappingTablets(loadMapEntry.getKey(), tabletsMetadata.iterator()); + List<TabletMetadata> tablets = findOverlappingTablets(loadMapEntry.getKey(), tabletIter); loader.load(tablets, loadMapEntry.getValue()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index df256d2499..1a3439919a 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -35,7 +35,9 @@ import java.nio.file.Paths; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -49,6 +51,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -829,6 +832,59 @@ public class BulkNewIT extends SharedMiniClusterBase { } } + @Test + public void testManyTabletAndFiles() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + TreeSet<Text> splits = IntStream.range(1, 9000).mapToObj(BulkNewIT::row).map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); + c.tableOperations().addSplits(tableName, splits); + + var executor = Executors.newFixedThreadPool(16); + var futures = new ArrayList<Future<?>>(); + + var loadPlanBuilder = LoadPlan.builder(); + var rowsExpected = new HashSet<>(); + var imports = IntStream.range(2, 8999).boxed().collect(Collectors.toList()); + // The order in which imports are added to the load plan should not matter so test that. + Collections.shuffle(imports); + for (var data : imports) { + String filename = "f" + data + "."; + loadPlanBuilder.loadFileTo(filename + RFile.EXTENSION, RangeType.TABLE, row(data - 1), + row(data)); + var future = executor.submit(() -> { + writeData(dir + "/" + filename, aconf, data, data); + return null; + }); + futures.add(future); + rowsExpected.add(row(data)); + } + + for (var future : futures) { + future.get(); + } + + executor.shutdown(); + + var loadPlan = loadPlanBuilder.build(); + + c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load(); + + // using a batch scanner can read from lots of tablets w/ less RPCs + try (var scanner = c.createBatchScanner(tableName)) { + // use a scan server so that tablets do not need to be hosted + scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL); + scanner.setRanges(List.of(new Range())); + var rowsSeen = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet()); + assertEquals(rowsExpected, rowsSeen); + } + } + } + /** * @return Map w/ keys that are end rows of tablets and the value is a true when the tablet has a * current location.