This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 8353f3e91457583719fa27381406b155cc1eec24 Merge: b662aa9800 df8c5cb51f Author: Keith Turner <ktur...@apache.org> AuthorDate: Sat Mar 29 00:15:44 2025 +0000 Merge branch '2.1' .../apache/accumulo/core/util/PeekingIterator.java | 20 +++--- .../accumulo/core/util/PeekingIteratorTest.java | 78 +++++++++++++++++++++- .../apache/accumulo/test/functional/BulkNewIT.java | 43 ++++++++++++ 3 files changed, 131 insertions(+), 10 deletions(-) diff --cc test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index edf90636db,26784ccfb5..1166a2dfa5 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@@ -1016,162 -622,47 +1016,205 @@@ public class BulkNewIT extends SharedMi } } + /* + * Test bulk importing to tablets with different availability settings. For hosted tablets bulk + * import should refresh them. + */ + @Test + public void testAvailability() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + addSplits(c, tableName, "0100 0200 0300 0400 0500"); + + c.tableOperations().setTabletAvailability(tableName, new Range("0100", false, "0200", true), + TabletAvailability.HOSTED); + c.tableOperations().setTabletAvailability(tableName, new Range("0300", false, "0400", true), + TabletAvailability.HOSTED); + c.tableOperations().setTabletAvailability(tableName, new Range("0400", false, null, true), + TabletAvailability.UNHOSTED); + + // verify tablet availabilities are as expected + var expectedAvailabilites = + Map.of("0100", TabletAvailability.ONDEMAND, "0200", TabletAvailability.HOSTED, "0300", + TabletAvailability.ONDEMAND, "0400", TabletAvailability.HOSTED, "0500", + TabletAvailability.UNHOSTED, "NULL", TabletAvailability.UNHOSTED); + assertEquals(expectedAvailabilites, getTabletAvailabilities(c, tableName)); + + var expectedHosting = expectedAvailabilites.entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, e -> e.getValue() == TabletAvailability.HOSTED)); + + // Wait for the tablets w/ a TabletAvailability of HOSTED to have a location. Waiting for this + // ensures when the bulk import runs that some tablets will be hosted and others will not. + Wait.waitFor(() -> getLocationStatus(c, tableName).equals(expectedHosting)); + + // create files that straddle tables w/ different Availability settings + writeData(fs, dir + "/f1.", aconf, 0, 150); + writeData(fs, dir + "/f2.", aconf, 151, 250); + writeData(fs, dir + "/f3.", aconf, 251, 350); + writeData(fs, dir + "/f4.", aconf, 351, 450); + writeData(fs, dir + "/f5.", aconf, 451, 550); + + c.tableOperations().importDirectory(dir).to(tableName).load(); + + // Verify bulk import operation did not change anything w.r.t. tablet hosting, should not + // cause ondemand tablets to be hosted. + assertEquals(expectedAvailabilites, getTabletAvailabilities(c, tableName)); + assertEquals(expectedHosting, getLocationStatus(c, tableName)); + + // after import data should be visible + try (var scanner = c.createScanner(tableName)) { + var expected = IntStream.range(0, 401).mapToObj(i -> String.format("%04d", i)) + .collect(Collectors.toSet()); + // scan up to the unhosted tablet + scanner.setRange(new Range("0000", true, "0400", true)); + var seen = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet()); + assertEquals(expected, seen); + } + + try (var scanner = c.createScanner(tableName)) { + var expected = IntStream.range(0, 551).mapToObj(i -> String.format("%04d", i)) + .collect(Collectors.toSet()); + // with eventual scan should see data imported into unhosted tablets + scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL); + var seen = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet()); + assertEquals(expected, seen); + } + } + } + + @Test + public void testManyTabletAndFiles() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + TreeSet<Text> splits = IntStream.range(1, 9000).mapToObj(BulkNewIT::row).map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); + c.tableOperations().addSplits(tableName, splits); + + var executor = Executors.newFixedThreadPool(16); + var futures = new ArrayList<Future<?>>(); + + var loadPlanBuilder = LoadPlan.builder(); + var rowsExpected = new HashSet<>(); + var imports = IntStream.range(2, 8999).boxed().collect(Collectors.toList()); + // The order in which imports are added to the load plan should not matter so test that. + Collections.shuffle(imports); + for (var data : imports) { + String filename = "f" + data + "."; + loadPlanBuilder.loadFileTo(filename + RFile.EXTENSION, RangeType.TABLE, row(data - 1), + row(data)); + var future = executor.submit(() -> { + writeData(fs, dir + "/" + filename, aconf, data, data); + return null; + }); + futures.add(future); + rowsExpected.add(row(data)); + } + + for (var future : futures) { + future.get(); + } + + executor.shutdown(); + + var loadPlan = loadPlanBuilder.build(); + + c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load(); + + // using a batch scanner can read from lots of tablets w/ less RPCs + try (var scanner = c.createBatchScanner(tableName)) { + // use a scan server so that tablets do not need to be hosted + scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL); + scanner.setRanges(List.of(new Range())); + var rowsSeen = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet()); + assertEquals(rowsExpected, rowsSeen); + } + } + } + + /** + * @return Map w/ keys that are end rows of tablets and the value is a true when the tablet has a + * current location. + */ + private static Map<String,Boolean> getLocationStatus(AccumuloClient c, String tableName) + throws Exception { + ClientContext ctx = (ClientContext) c; + var tableId = ctx.getTableId(tableName); + try (var tablets = ctx.getAmple().readTablets().forTable(tableId).build()) { + return tablets.stream().collect(Collectors.toMap(tm -> { + var er = tm.getExtent().endRow(); + return er == null ? "NULL" : er.toString(); + }, tm -> { + var loc = tm.getLocation(); + return loc != null && loc.getType() == TabletMetadata.LocationType.CURRENT; + })); + } + } + + /** + * @return Map w/ keys that are end rows of tablets and the value is the tablets availability. + */ + private static Map<String,TabletAvailability> getTabletAvailabilities(AccumuloClient c, + String tableName) throws TableNotFoundException { + try (var tabletsInfo = c.tableOperations().getTabletInformation(tableName, new Range())) { + return tabletsInfo.collect(Collectors.toMap(ti -> { + var er = ti.getTabletId().getEndRow(); + return er == null ? "NULL" : er.toString(); + }, TabletInformation::getTabletAvailability)); + } + } + + @Test + public void testManyTablets() throws Exception { + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String dir = getDir("/testManyTablets-"); + writeData(fs, dir + "/f1.", aconf, 0, 199); + writeData(fs, dir + "/f2.", aconf, 200, 399); + writeData(fs, dir + "/f3.", aconf, 400, 599); + writeData(fs, dir + "/f4.", aconf, 600, 799); + writeData(fs, dir + "/f5.", aconf, 800, 999); + + var splits = IntStream.range(1, 1000).mapToObj(BulkNewIT::row).map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); + + // faster to create a table w/ lots of splits + c.tableOperations().delete(tableName); - c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); ++ var props = Map.of(Property.TABLE_BULK_MAX_TABLETS.getKey(), "500"); ++ c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits) ++ .withInitialTabletAvailability(TabletAvailability.HOSTED).setProperties(props)); + + var lpBuilder = LoadPlan.builder(); + lpBuilder.loadFileTo("f1.rf", RangeType.TABLE, null, row(1)); + IntStream.range(2, 200) + .forEach(i -> lpBuilder.loadFileTo("f1.rf", RangeType.TABLE, row(i - 1), row(i))); + IntStream.range(200, 400) + .forEach(i -> lpBuilder.loadFileTo("f2.rf", RangeType.TABLE, row(i - 1), row(i))); + IntStream.range(400, 600) + .forEach(i -> lpBuilder.loadFileTo("f3.rf", RangeType.TABLE, row(i - 1), row(i))); + IntStream.range(600, 800) + .forEach(i -> lpBuilder.loadFileTo("f4.rf", RangeType.TABLE, row(i - 1), row(i))); + IntStream.range(800, 1000) + .forEach(i -> lpBuilder.loadFileTo("f5.rf", RangeType.TABLE, row(i - 1), row(i))); + + var loadPlan = lpBuilder.build(); + + c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load(); + + verifyData(c, tableName, 0, 999, false); + + } + + } + private void addSplits(AccumuloClient client, String tableName, String splitString) throws Exception { SortedSet<Text> splits = new TreeSet<>();