This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new f6c096bef1 Avoids buffering large split points in memory (#4173) f6c096bef1 is described below commit f6c096bef1c7809b698a9ed9b39d1f625c1e0ee3 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Jan 19 13:30:10 2024 -0500 Avoids buffering large split points in memory (#4173) When a tablet needs to split a set of split points is found. These split points must be under a desired threshold. The code that found these would buffer everything in memory and then remove any that were over the threshold. This commit avoids buffering anything that is over the threshold in the first place. Noticed this behavior while debugging #4172 --- .../apache/accumulo/manager/split/SplitUtils.java | 35 ++++--- .../accumulo/manager/split/SplitUtilsTest.java | 107 ++++++++++++--------- 2 files changed, 84 insertions(+), 58 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java index 8f1ac57729..bce10badb2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.Predicate; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ByteSequence; @@ -218,19 +219,19 @@ public class SplitUtils { try (var indexIterable = new IndexIterable(context, tableConf, tabletMetadata.getFiles(), tabletMetadata.getEndRow(), tabletMetadata.getPrevEndRow())) { - var splits = findSplits(indexIterable, calculateDesiredSplits(estimatedSize, threshold)); - splits.removeIf(split -> { - if (split.getLength() >= maxEndRowSize) { + Predicate<ByteSequence> splitPredicate = splitCandidate -> { + if (splitCandidate.length() >= maxEndRowSize) { log.warn("Ignoring split point for {} of length {}", tabletMetadata.getExtent(), - split.getLength()); - return true; + splitCandidate.length()); + return false; } - return false; - }); + return true; + }; - return splits; + return findSplits(indexIterable, calculateDesiredSplits(estimatedSize, threshold), + splitPredicate); } } @@ -243,7 +244,8 @@ public class SplitUtils { return common; } - public static SortedSet<Text> findSplits(Iterable<Key> tabletIndexIterator, int desiredSplits) { + public static SortedSet<Text> findSplits(Iterable<Key> tabletIndexIterator, int desiredSplits, + Predicate<ByteSequence> rowPredicate) { Preconditions.checkArgument(desiredSplits >= 1); int numKeys = Iterables.size(tabletIndexIterator); @@ -263,17 +265,24 @@ public class SplitUtils { } count++; + if (count >= Math.round((splits.size() + 1) * interSplitDistance)) { if (prevRow == null) { - splits.add(key.getRow()); + if (rowPredicate.test(key.getRowData())) { + splits.add(key.getRow()); + } } else { var lcl = longestCommonLength(prevRow, key.getRowData()); if (lcl + 1 >= key.getRowData().length()) { - splits.add(key.getRow()); + if (rowPredicate.test(key.getRowData())) { + splits.add(key.getRow()); + } } else { - splits.add(new Text(key.getRowData().subSequence(0, lcl + 1).toArray())); + var shortenedRow = key.getRowData().subSequence(0, lcl + 1); + if (rowPredicate.test(shortenedRow)) { + splits.add(new Text(shortenedRow.toArray())); + } } - } if (splits.size() >= desiredSplits) { diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java index 66684a5333..80258e4914 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java @@ -33,9 +33,11 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -124,25 +126,26 @@ public class SplitUtilsTest { }; } + public static SortedSet<Text> findSplits(Iterable<Key> tabletIndexIterator, int desiredSplits) { + return SplitUtils.findSplits(tabletIndexIterator, desiredSplits, sc -> true); + } + @Test public void testFindSplits() { List<Key> keys = IntStream.range(1, 101).mapToObj(SplitUtilsTest::newKey).collect(toList()); - assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1)); - assertEquals(newRowsSet(33, 67), SplitUtils.findSplits(keys, 2)); - assertEquals(newRowsSet(25, 50, 75), SplitUtils.findSplits(keys, 3)); - assertEquals(newRowsSet(20, 40, 60, 80), SplitUtils.findSplits(keys, 4)); - assertEquals(newRowsSet(17, 33, 50, 67, 83), SplitUtils.findSplits(keys, 5)); - assertEquals(newRowsSet(14, 29, 43, 57, 71, 86), SplitUtils.findSplits(keys, 6)); - assertEquals(newRowsSet(13, 25, 38, 50, 63, 75, 88), SplitUtils.findSplits(keys, 7)); - assertEquals(newRowsSet(IntStream.range(1, 10).map(i -> i * 10)), - SplitUtils.findSplits(keys, 9)); - assertEquals(newRowsSet(IntStream.range(1, 20).map(i -> i * 5)), - SplitUtils.findSplits(keys, 19)); - assertEquals(newRowsSet(IntStream.range(1, 50).map(i -> i * 2)), - SplitUtils.findSplits(keys, 49)); - assertEquals(newRowsSet(IntStream.range(1, 100)), SplitUtils.findSplits(keys, 99)); - assertEquals(newRowsSet(IntStream.range(1, 101)), SplitUtils.findSplits(keys, 100)); - assertEquals(newRowsSet(IntStream.range(1, 101)), SplitUtils.findSplits(keys, 1000)); + assertEquals(newRowsSet(50), findSplits(keys, 1)); + assertEquals(newRowsSet(33, 67), findSplits(keys, 2)); + assertEquals(newRowsSet(25, 50, 75), findSplits(keys, 3)); + assertEquals(newRowsSet(20, 40, 60, 80), findSplits(keys, 4)); + assertEquals(newRowsSet(17, 33, 50, 67, 83), findSplits(keys, 5)); + assertEquals(newRowsSet(14, 29, 43, 57, 71, 86), findSplits(keys, 6)); + assertEquals(newRowsSet(13, 25, 38, 50, 63, 75, 88), findSplits(keys, 7)); + assertEquals(newRowsSet(IntStream.range(1, 10).map(i -> i * 10)), findSplits(keys, 9)); + assertEquals(newRowsSet(IntStream.range(1, 20).map(i -> i * 5)), findSplits(keys, 19)); + assertEquals(newRowsSet(IntStream.range(1, 50).map(i -> i * 2)), findSplits(keys, 49)); + assertEquals(newRowsSet(IntStream.range(1, 100)), findSplits(keys, 99)); + assertEquals(newRowsSet(IntStream.range(1, 101)), findSplits(keys, 100)); + assertEquals(newRowsSet(IntStream.range(1, 101)), findSplits(keys, 1000)); } @Test @@ -150,42 +153,42 @@ public class SplitUtilsTest { List<Key> keys = IntStream.range(1, 101).map(i -> i / 10 * 10).mapToObj(SplitUtilsTest::newKey) .collect(toList()); - assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1)); - assertEquals(newRowsSet(i -> i - 10, 30, 60), SplitUtils.findSplits(keys, 2)); - assertEquals(newRowsSet(i -> i - 10, 20, 50, 70), SplitUtils.findSplits(keys, 3)); + assertEquals(newRowsSet(50), findSplits(keys, 1)); + assertEquals(newRowsSet(i -> i - 10, 30, 60), findSplits(keys, 2)); + assertEquals(newRowsSet(i -> i - 10, 20, 50, 70), findSplits(keys, 3)); assertEquals(newRowsSet(IntStream.range(1, 10).map(i -> i * 10), i -> i - 10), - SplitUtils.findSplits(keys, 9)); + findSplits(keys, 9)); assertEquals(newRowsSet(IntStream.range(0, 11).map(i -> i * 10), i -> i == 0 ? null : i - 10), - SplitUtils.findSplits(keys, 19)); + findSplits(keys, 19)); assertEquals(newRowsSet(IntStream.range(0, 11).map(i -> i * 10), i -> i == 0 ? null : i - 10), - SplitUtils.findSplits(keys, 100)); + findSplits(keys, 100)); } @Test public void testIndexIterator() { Iterable<Key> keys = newIndexIterable(IntStream.range(1, 101), null, null); - assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1)); - assertEquals(newRowsSet(25, 50, 75), SplitUtils.findSplits(keys, 3)); - assertEquals(newRowsSet(14, 29, 43, 57, 71, 86), SplitUtils.findSplits(keys, 6)); - assertEquals(newRowsSet(IntStream.range(1, 101)), SplitUtils.findSplits(keys, 200)); + assertEquals(newRowsSet(50), findSplits(keys, 1)); + assertEquals(newRowsSet(25, 50, 75), findSplits(keys, 3)); + assertEquals(newRowsSet(14, 29, 43, 57, 71, 86), findSplits(keys, 6)); + assertEquals(newRowsSet(IntStream.range(1, 101)), findSplits(keys, 200)); keys = newIndexIterable(IntStream.range(1, 101), null, 50); - assertEquals(newRowsSet(75), SplitUtils.findSplits(keys, 1)); - assertEquals(newRowsSet(67, 83), SplitUtils.findSplits(keys, 2)); - assertEquals(newRowsSet(60, 70, 80, 90), SplitUtils.findSplits(keys, 4)); - assertEquals(newRowsSet(IntStream.range(51, 101)), SplitUtils.findSplits(keys, 60)); + assertEquals(newRowsSet(75), findSplits(keys, 1)); + assertEquals(newRowsSet(67, 83), findSplits(keys, 2)); + assertEquals(newRowsSet(60, 70, 80, 90), findSplits(keys, 4)); + assertEquals(newRowsSet(IntStream.range(51, 101)), findSplits(keys, 60)); keys = newIndexIterable(IntStream.range(1, 101), 50, null); - assertEquals(newRowsSet(25), SplitUtils.findSplits(keys, 1)); - assertEquals(newRowsSet(17, 33), SplitUtils.findSplits(keys, 2)); - assertEquals(newRowsSet(10, 20, 30, 40), SplitUtils.findSplits(keys, 4)); - assertEquals(newRowsSet(IntStream.range(1, 51)), SplitUtils.findSplits(keys, 60)); + assertEquals(newRowsSet(25), findSplits(keys, 1)); + assertEquals(newRowsSet(17, 33), findSplits(keys, 2)); + assertEquals(newRowsSet(10, 20, 30, 40), findSplits(keys, 4)); + assertEquals(newRowsSet(IntStream.range(1, 51)), findSplits(keys, 60)); keys = newIndexIterable(IntStream.range(1, 101), 75, 25); - assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1)); - assertEquals(newRowsSet(17 + 25, 33 + 25), SplitUtils.findSplits(keys, 2)); - assertEquals(newRowsSet(35, 45, 55, 65), SplitUtils.findSplits(keys, 4)); - assertEquals(newRowsSet(IntStream.range(26, 76)), SplitUtils.findSplits(keys, 60)); + assertEquals(newRowsSet(50), findSplits(keys, 1)); + assertEquals(newRowsSet(17 + 25, 33 + 25), findSplits(keys, 2)); + assertEquals(newRowsSet(35, 45, 55, 65), findSplits(keys, 4)); + assertEquals(newRowsSet(IntStream.range(26, 76)), findSplits(keys, 60)); } @Test @@ -194,8 +197,8 @@ public class SplitUtilsTest { // range falls between two index keys. Iterable<Key> keys = newIndexIterable(IntStream.range(1, 101).map(i -> i * 1000), 250, 150); assertFalse(keys.iterator().hasNext()); - assertEquals(Set.of(), SplitUtils.findSplits(keys, 1)); - assertEquals(Set.of(), SplitUtils.findSplits(keys, 2)); + assertEquals(Set.of(), findSplits(keys, 1)); + assertEquals(Set.of(), findSplits(keys, 2)); } @Test @@ -220,11 +223,25 @@ public class SplitUtilsTest { var keys = newIndexIterable( Stream.of("aa11", "aa112", "b", "bg45", "ct", "cz7882", "mn", "mn009", "mnrtssd", "mnz076"), null, null); - assertEquals(newRowSet("c"), SplitUtils.findSplits(keys, 1)); - assertEquals(newRowSet("b", "m"), SplitUtils.findSplits(keys, 2)); - assertEquals(newRowSet("b", "c", "mn0"), SplitUtils.findSplits(keys, 3)); - assertEquals(newRowSet("aa112", "bg", "cz", "mn0"), SplitUtils.findSplits(keys, 4)); + assertEquals(newRowSet("c"), findSplits(keys, 1)); + assertEquals(newRowSet("b", "m"), findSplits(keys, 2)); + assertEquals(newRowSet("b", "c", "mn0"), findSplits(keys, 3)); + assertEquals(newRowSet("aa112", "bg", "cz", "mn0"), findSplits(keys, 4)); assertEquals(newRowSet("aa11", "aa112", "b", "bg", "c", "cz", "m", "mn0", "mnr", "mnz"), - SplitUtils.findSplits(keys, 10)); + findSplits(keys, 10)); + } + + @Test + public void testSplitFilter() { + List<Key> keys = IntStream.range(1, 101).mapToObj(SplitUtilsTest::newKey).collect(toList()); + Predicate<ByteSequence> splitFilter = splitCandidate -> { + int i = Integer.parseInt(splitCandidate.toString()); + return i % 3 != 0; + }; + + assertEquals(newRowsSet(50), SplitUtils.findSplits(keys, 1, splitFilter)); + assertEquals(newRowsSet(34, 67), SplitUtils.findSplits(keys, 2, splitFilter)); + assertEquals(newRowsSet(25, 50, 76), SplitUtils.findSplits(keys, 3, splitFilter)); + assertEquals(newRowsSet(20, 40, 61, 80), SplitUtils.findSplits(keys, 4, splitFilter)); } }