This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 8353f3e91457583719fa27381406b155cc1eec24
Merge: b662aa9800 df8c5cb51f
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sat Mar 29 00:15:44 2025 +0000

    Merge branch '2.1'

 .../apache/accumulo/core/util/PeekingIterator.java | 20 +++---
 .../accumulo/core/util/PeekingIteratorTest.java    | 78 +++++++++++++++++++++-
 .../apache/accumulo/test/functional/BulkNewIT.java | 43 ++++++++++++
 3 files changed, 131 insertions(+), 10 deletions(-)

diff --cc test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index edf90636db,26784ccfb5..1166a2dfa5
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@@ -1016,162 -622,47 +1016,205 @@@ public class BulkNewIT extends SharedMi
      }
    }
  
 +  /*
 +   * Test bulk importing to tablets with different availability settings. For 
hosted tablets bulk
 +   * import should refresh them.
 +   */
 +  @Test
 +  public void testAvailability() throws Exception {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      String dir = getDir("/testBulkFile-");
 +      FileSystem fs = getCluster().getFileSystem();
 +      fs.mkdirs(new Path(dir));
 +
 +      addSplits(c, tableName, "0100 0200 0300 0400 0500");
 +
 +      c.tableOperations().setTabletAvailability(tableName, new Range("0100", 
false, "0200", true),
 +          TabletAvailability.HOSTED);
 +      c.tableOperations().setTabletAvailability(tableName, new Range("0300", 
false, "0400", true),
 +          TabletAvailability.HOSTED);
 +      c.tableOperations().setTabletAvailability(tableName, new Range("0400", 
false, null, true),
 +          TabletAvailability.UNHOSTED);
 +
 +      // verify tablet availabilities are as expected
 +      var expectedAvailabilites =
 +          Map.of("0100", TabletAvailability.ONDEMAND, "0200", 
TabletAvailability.HOSTED, "0300",
 +              TabletAvailability.ONDEMAND, "0400", TabletAvailability.HOSTED, 
"0500",
 +              TabletAvailability.UNHOSTED, "NULL", 
TabletAvailability.UNHOSTED);
 +      assertEquals(expectedAvailabilites, getTabletAvailabilities(c, 
tableName));
 +
 +      var expectedHosting = expectedAvailabilites.entrySet().stream()
 +          .collect(Collectors.toMap(Entry::getKey, e -> e.getValue() == 
TabletAvailability.HOSTED));
 +
 +      // Wait for the tablets w/ a TabletAvailability of HOSTED to have a 
location. Waiting for this
 +      // ensures when the bulk import runs that some tablets will be hosted 
and others will not.
 +      Wait.waitFor(() -> getLocationStatus(c, 
tableName).equals(expectedHosting));
 +
 +      // create files that straddle tables w/ different Availability settings
 +      writeData(fs, dir + "/f1.", aconf, 0, 150);
 +      writeData(fs, dir + "/f2.", aconf, 151, 250);
 +      writeData(fs, dir + "/f3.", aconf, 251, 350);
 +      writeData(fs, dir + "/f4.", aconf, 351, 450);
 +      writeData(fs, dir + "/f5.", aconf, 451, 550);
 +
 +      c.tableOperations().importDirectory(dir).to(tableName).load();
 +
 +      // Verify bulk import operation did not change anything w.r.t. tablet 
hosting, should not
 +      // cause ondemand tablets to be hosted.
 +      assertEquals(expectedAvailabilites, getTabletAvailabilities(c, 
tableName));
 +      assertEquals(expectedHosting, getLocationStatus(c, tableName));
 +
 +      // after import data should be visible
 +      try (var scanner = c.createScanner(tableName)) {
 +        var expected = IntStream.range(0, 401).mapToObj(i -> 
String.format("%04d", i))
 +            .collect(Collectors.toSet());
 +        // scan up to the unhosted tablet
 +        scanner.setRange(new Range("0000", true, "0400", true));
 +        var seen = scanner.stream().map(e -> 
e.getKey().getRowData().toString())
 +            .collect(Collectors.toSet());
 +        assertEquals(expected, seen);
 +      }
 +
 +      try (var scanner = c.createScanner(tableName)) {
 +        var expected = IntStream.range(0, 551).mapToObj(i -> 
String.format("%04d", i))
 +            .collect(Collectors.toSet());
 +        // with eventual scan should see data imported into unhosted tablets
 +        scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
 +        var seen = scanner.stream().map(e -> 
e.getKey().getRowData().toString())
 +            .collect(Collectors.toSet());
 +        assertEquals(expected, seen);
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testManyTabletAndFiles() throws Exception {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      String dir = getDir("/testBulkFile-");
 +      FileSystem fs = getCluster().getFileSystem();
 +      fs.mkdirs(new Path(dir));
 +
 +      TreeSet<Text> splits = IntStream.range(1, 
9000).mapToObj(BulkNewIT::row).map(Text::new)
 +          .collect(Collectors.toCollection(TreeSet::new));
 +      c.tableOperations().addSplits(tableName, splits);
 +
 +      var executor = Executors.newFixedThreadPool(16);
 +      var futures = new ArrayList<Future<?>>();
 +
 +      var loadPlanBuilder = LoadPlan.builder();
 +      var rowsExpected = new HashSet<>();
 +      var imports = IntStream.range(2, 
8999).boxed().collect(Collectors.toList());
 +      // The order in which imports are added to the load plan should not 
matter so test that.
 +      Collections.shuffle(imports);
 +      for (var data : imports) {
 +        String filename = "f" + data + ".";
 +        loadPlanBuilder.loadFileTo(filename + RFile.EXTENSION, 
RangeType.TABLE, row(data - 1),
 +            row(data));
 +        var future = executor.submit(() -> {
 +          writeData(fs, dir + "/" + filename, aconf, data, data);
 +          return null;
 +        });
 +        futures.add(future);
 +        rowsExpected.add(row(data));
 +      }
 +
 +      for (var future : futures) {
 +        future.get();
 +      }
 +
 +      executor.shutdown();
 +
 +      var loadPlan = loadPlanBuilder.build();
 +
 +      
c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();
 +
 +      // using a batch scanner can read from lots of tablets w/ less RPCs
 +      try (var scanner = c.createBatchScanner(tableName)) {
 +        // use a scan server so that tablets do not need to be hosted
 +        scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
 +        scanner.setRanges(List.of(new Range()));
 +        var rowsSeen = scanner.stream().map(e -> 
e.getKey().getRowData().toString())
 +            .collect(Collectors.toSet());
 +        assertEquals(rowsExpected, rowsSeen);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * @return Map w/ keys that are end rows of tablets and the value is a true 
when the tablet has a
 +   *         current location.
 +   */
 +  private static Map<String,Boolean> getLocationStatus(AccumuloClient c, 
String tableName)
 +      throws Exception {
 +    ClientContext ctx = (ClientContext) c;
 +    var tableId = ctx.getTableId(tableName);
 +    try (var tablets = 
ctx.getAmple().readTablets().forTable(tableId).build()) {
 +      return tablets.stream().collect(Collectors.toMap(tm -> {
 +        var er = tm.getExtent().endRow();
 +        return er == null ? "NULL" : er.toString();
 +      }, tm -> {
 +        var loc = tm.getLocation();
 +        return loc != null && loc.getType() == 
TabletMetadata.LocationType.CURRENT;
 +      }));
 +    }
 +  }
 +
 +  /**
 +   * @return Map w/ keys that are end rows of tablets and the value is the 
tablets availability.
 +   */
 +  private static Map<String,TabletAvailability> 
getTabletAvailabilities(AccumuloClient c,
 +      String tableName) throws TableNotFoundException {
 +    try (var tabletsInfo = 
c.tableOperations().getTabletInformation(tableName, new Range())) {
 +      return tabletsInfo.collect(Collectors.toMap(ti -> {
 +        var er = ti.getTabletId().getEndRow();
 +        return er == null ? "NULL" : er.toString();
 +      }, TabletInformation::getTabletAvailability));
 +    }
 +  }
 +
+   @Test
+   public void testManyTablets() throws Exception {
+ 
+     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+       String dir = getDir("/testManyTablets-");
+       writeData(fs, dir + "/f1.", aconf, 0, 199);
+       writeData(fs, dir + "/f2.", aconf, 200, 399);
+       writeData(fs, dir + "/f3.", aconf, 400, 599);
+       writeData(fs, dir + "/f4.", aconf, 600, 799);
+       writeData(fs, dir + "/f5.", aconf, 800, 999);
+ 
+       var splits = IntStream.range(1, 
1000).mapToObj(BulkNewIT::row).map(Text::new)
+           .collect(Collectors.toCollection(TreeSet::new));
+ 
+       // faster to create a table w/ lots of splits
+       c.tableOperations().delete(tableName);
 -      c.tableOperations().create(tableName, new 
NewTableConfiguration().withSplits(splits));
++      var props = Map.of(Property.TABLE_BULK_MAX_TABLETS.getKey(), "500");
++      c.tableOperations().create(tableName, new 
NewTableConfiguration().withSplits(splits)
++          
.withInitialTabletAvailability(TabletAvailability.HOSTED).setProperties(props));
+ 
+       var lpBuilder = LoadPlan.builder();
+       lpBuilder.loadFileTo("f1.rf", RangeType.TABLE, null, row(1));
+       IntStream.range(2, 200)
+           .forEach(i -> lpBuilder.loadFileTo("f1.rf", RangeType.TABLE, row(i 
- 1), row(i)));
+       IntStream.range(200, 400)
+           .forEach(i -> lpBuilder.loadFileTo("f2.rf", RangeType.TABLE, row(i 
- 1), row(i)));
+       IntStream.range(400, 600)
+           .forEach(i -> lpBuilder.loadFileTo("f3.rf", RangeType.TABLE, row(i 
- 1), row(i)));
+       IntStream.range(600, 800)
+           .forEach(i -> lpBuilder.loadFileTo("f4.rf", RangeType.TABLE, row(i 
- 1), row(i)));
+       IntStream.range(800, 1000)
+           .forEach(i -> lpBuilder.loadFileTo("f5.rf", RangeType.TABLE, row(i 
- 1), row(i)));
+ 
+       var loadPlan = lpBuilder.build();
+ 
+       
c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();
+ 
+       verifyData(c, tableName, 0, 999, false);
+ 
+     }
+ 
+   }
+ 
    private void addSplits(AccumuloClient client, String tableName, String 
splitString)
        throws Exception {
      SortedSet<Text> splits = new TreeSet<>();

Reply via email to