This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new dc2109ad9c Recreate TabletsMetadata iterator when file ranges are not contiguous (#5341) dc2109ad9c is described below commit dc2109ad9cb8c0b6e0f4ea05c696b0429c8289b9 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Mar 28 12:22:39 2025 -0400 Recreate TabletsMetadata iterator when file ranges are not contiguous (#5341) In the Bulk Import v2 LoadFiles step a single TabletsMetadata object was used to map a tables tablets to a set of bulk import files. In the case where a small percentage of tablets were involved in the bulk import a majority of the tables tablets would still be evaluated. In the case where bulk imports were not importing into contiguous tablets the code would just iterate over the tables tablets until it found the next starting point. This change creates a new property to determine how many TabletMetadata objects to evaluate before recreating the TabletsMetadata object at the new start position. When bulk importing into a set of contiguous tablets or into a majority of tablets in a table, the value of this property should likely be zero to leave this feature disabled. When importing into a non-contiguous set of tablets, then this property could be used to speed up the bulk import processing. This change also closes the TabletsMetadata objects which was not being done previously. Related to #5201 Co-authored-by: Daniel Roberts <ddani...@gmail.com> --- .../org/apache/accumulo/core/conf/Property.java | 12 ++ .../apache/accumulo/core/util/PeekingIterator.java | 37 ++++ .../accumulo/core/util/PeekingIteratorTest.java | 88 ++++++++ .../manager/tableOps/bulkVer2/LoadFiles.java | 32 ++- .../manager/tableOps/bulkVer2/PrepBulkImport.java | 154 ++++++++------ .../manager/tableOps/bulkVer2/LoadFilesTest.java | 2 +- .../tableOps/bulkVer2/PrepBulkImportTest.java | 76 ++++--- .../apache/accumulo/test/functional/BulkNewIT.java | 51 +++-- .../test/functional/BulkNewMetadataSkipIT.java | 221 +++++++++++++++++++++ 9 files changed, 548 insertions(+), 125 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index ff0cad4bab..39955d0998 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1220,6 +1220,18 @@ public enum Property { "The maximum number of tablets allowed for one bulk import file. Value of 0 is Unlimited. " + "This property is only enforced in the new bulk import API.", "2.1.0"), + TABLE_BULK_SKIP_THRESHOLD("table.bulk.metadata.skip.distance", "0", PropertyType.COUNT, + "When performing bulk v2 imports to a table, the Manager iterates over the tables metadata" + + " tablets sequentially. When importing files into a small table or into all or a majority" + + " of tablets of a large table then the tablet metadata information for most tablets will be needed." + + " However, when importing files into a small number of non-contiguous tablets in a large table, then" + + " the Manager will look at each tablets metadata when it could be skipped. The value of this" + + " property tells the Manager if, and when, it should set up a new scanner over the metadata" + + " table instead of just iterating over tablet metadata to find the matching tablet. Setting up" + + " a new scanner is analogous to performing a seek in an iterator, but it has a cost. A value of zero (default) disables" + + " this feature. A non-zero value enables this feature and the Manager will setup a new scanner" + + " when the tablet metadata distance is above the supplied value.", + "2.1.4"), TABLE_DURABILITY("table.durability", "sync", PropertyType.DURABILITY, "The durability used to write to the write-ahead log. Legal values are:" + " none, which skips the write-ahead log; log, which sends the data to the" diff --git a/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java b/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java index f410174e73..c7b85ec159 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java @@ -19,6 +19,9 @@ package org.apache.accumulo.core.util; import java.util.Iterator; +import java.util.function.Predicate; + +import com.google.common.base.Preconditions; public class PeekingIterator<E> implements Iterator<E> { @@ -91,4 +94,38 @@ public class PeekingIterator<E> implements Iterator<E> { } return top != null; } + + /** + * Advances the underlying iterator looking for a match, inspecting up to {@code limit} elements + * from the iterator. If this method finds a match to the predicate, then it will return true and + * will be positioned before the matching element (peek() and next() will return the matching + * element). If this method does not find a match because the underlying iterator ended before + * {@code limit}, then it will return false and hasNext will also return false. Otherwise, if this + * method does not find a match, then it will return false and be positioned before the limit + * element (peek() and next() will return the {@code limit} element). + * + * @param predicate condition that we are looking for, parameter could be null, so the Predicate + * implementation needs to handle this. + * @param limit number of times that we should look for a match, parameter must be a positive int + * @return true if an element matched the predicate or false otherwise. When true hasNext() will + * return true and peek() and next() will return the matching element. When false + * hasNext() may return false if the end has been reached, or hasNext() may return true in + * which case peek() and next() will return the element {@code limit} positions ahead of + * where this iterator was before this method was called. + */ + public boolean findWithin(Predicate<E> predicate, int limit) { + Preconditions.checkArgument(limit > 0); + for (int i = 0; i < limit; i++) { + if (predicate.test(peek())) { + return true; + } else if (i < (limit - 1)) { + if (hasNext()) { + next(); + } else { + return false; + } + } + } + return false; + } } diff --git a/core/src/test/java/org/apache/accumulo/core/util/PeekingIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/util/PeekingIteratorTest.java new file mode 100644 index 0000000000..27bf76a725 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/util/PeekingIteratorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Iterator; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.Test; + +public class PeekingIteratorTest { + + @Test + public void testPeek() { + Iterator<Integer> ints = IntStream.range(1, 11).iterator(); + PeekingIterator<Integer> peek = new PeekingIterator<>(ints); + + assertEquals(1, peek.peek()); + + for (int i = 1; i < 11; i++) { + assertTrue(peek.hasNext()); + assertEquals(i, peek.next()); + if (i == 10) { + assertNull(peek.peek()); + } else { + assertEquals(i + 1, peek.peek()); + } + } + + assertFalse(peek.hasNext()); + assertNull(peek.next()); + } + + @Test + public void testFind() { + + Iterator<Integer> ints = IntStream.range(1, 11).iterator(); + PeekingIterator<Integer> peek = new PeekingIterator<>(ints); + + assertThrows(IllegalArgumentException.class, () -> peek.findWithin(e -> false, -1)); + assertEquals(1, peek.peek()); + assertTrue(peek.findWithin((x) -> x != null && x == 4, 5)); + assertTrue(peek.hasNext()); + assertEquals(4, peek.next()); + assertEquals(5, peek.peek()); + + // Advance the iterator 2 times looking for 7. + // This will return false, but will advance + // twice leaving the iterator at 6. + assertFalse(peek.findWithin((x) -> x != null && x == 7, 2)); + + assertTrue(peek.hasNext()); + assertEquals(6, peek.peek()); + assertEquals(6, peek.next()); + + assertTrue(peek.findWithin((x) -> x != null && x == 8, 2)); + assertTrue(peek.hasNext()); + assertEquals(8, peek.next()); + + // Try to advance past the end + assertFalse(peek.findWithin((x) -> x != null && x == 7, 3)); + assertFalse(peek.hasNext()); + assertNull(peek.next()); + + } + +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index dfa4b98278..abed9684aa 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -123,7 +123,9 @@ class LoadFiles extends ManagerRepo { .forTable(bulkInfo.tableId).overlapping(startRow, null).checkConsistency() .fetch(PREV_ROW, LOCATION, LOADED).build(); - return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, manager, tid); + int skip = manager.getContext().getTableConfiguration(bulkInfo.tableId) + .getCount(Property.TABLE_BULK_SKIP_THRESHOLD); + return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, manager, tid, skip); } } @@ -352,8 +354,8 @@ class LoadFiles extends ManagerRepo { */ // visible for testing static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, - LoadMappingIterator loadMapIter, TabletsMetadataFactory factory, Manager manager, long tid) - throws Exception { + LoadMappingIterator loadMapIter, TabletsMetadataFactory factory, Manager manager, long tid, + int skipDistance) throws Exception { PeekingIterator<Map.Entry<KeyExtent,Bulk.Files>> lmi = new PeekingIterator<>(loadMapIter); Map.Entry<KeyExtent,Bulk.Files> loadMapEntry = lmi.peek(); @@ -366,15 +368,33 @@ class LoadFiles extends ManagerRepo { ImportTimingStats importTimingStats = new ImportTimingStats(); Timer timer = Timer.startNew(); - try (TabletsMetadata tabletsMetadata = factory.newTabletsMetadata(startRow)) { - Iterator<TabletMetadata> tabletIter = tabletsMetadata.iterator(); + TabletsMetadata tabletsMetadata = factory.newTabletsMetadata(startRow); + try { + PeekingIterator<TabletMetadata> pi = new PeekingIterator<>(tabletsMetadata.iterator()); while (lmi.hasNext()) { loadMapEntry = lmi.next(); + // If the user set the TABLE_BULK_SKIP_THRESHOLD property, then only look + // at the next skipDistance tablets before recreating the iterator + if (skipDistance > 0) { + final KeyExtent loadMapKey = loadMapEntry.getKey(); + if (!pi.findWithin( + tm -> PREV_COMP.compare(tm.getPrevEndRow(), loadMapKey.prevEndRow()) >= 0, + skipDistance)) { + log.debug( + "Next load mapping range {} not found in {} tablets, recreating TabletMetadata to jump ahead", + loadMapKey.prevEndRow(), skipDistance); + tabletsMetadata.close(); + tabletsMetadata = factory.newTabletsMetadata(loadMapKey.prevEndRow()); + pi = new PeekingIterator<>(tabletsMetadata.iterator()); + } + } List<TabletMetadata> tablets = - findOverlappingTablets(fmtTid, loadMapEntry.getKey(), tabletIter, importTimingStats); + findOverlappingTablets(fmtTid, loadMapEntry.getKey(), pi, importTimingStats); loader.load(tablets, loadMapEntry.getValue()); } + } finally { + tabletsMetadata.close(); } Duration totalProcessingTime = timer.elapsed(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java index 56116365b4..a8689034c0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java @@ -46,6 +46,7 @@ import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.util.PeekingIterator; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; @@ -104,6 +105,8 @@ public class PrepBulkImport extends ManagerRepo { @VisibleForTesting interface TabletIterFactory { Iterator<KeyExtent> newTabletIter(Text startRow); + + void close(); } private static boolean equals(Function<KeyExtent,Text> extractor, KeyExtent ke1, KeyExtent ke2) { @@ -116,82 +119,100 @@ public class PrepBulkImport extends ManagerRepo { */ @VisibleForTesting static KeyExtent validateLoadMapping(String tableId, LoadMappingIterator lmi, - TabletIterFactory tabletIterFactory, int maxNumTablets, long tid) throws Exception { + TabletIterFactory tabletIterFactory, int maxNumTablets, long tid, int skip) throws Exception { + var currRange = lmi.next(); Text startRow = currRange.getKey().prevEndRow(); - Iterator<KeyExtent> tabletIter = tabletIterFactory.newTabletIter(startRow); + PeekingIterator<KeyExtent> pi = + new PeekingIterator<>(tabletIterFactory.newTabletIter(startRow)); - KeyExtent currTablet = tabletIter.next(); + try { + KeyExtent currTablet = pi.next(); - var fileCounts = new HashMap<String,Integer>(); - int count; + var fileCounts = new HashMap<String,Integer>(); + int count; - KeyExtent firstTablet = currRange.getKey(); - KeyExtent lastTablet = currRange.getKey(); + KeyExtent firstTablet = currRange.getKey(); + KeyExtent lastTablet = currRange.getKey(); - if (!tabletIter.hasNext() && equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) - && equals(KeyExtent::endRow, currTablet, currRange.getKey())) { - currRange = null; - } + if (!pi.hasNext() && equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) + && equals(KeyExtent::endRow, currTablet, currRange.getKey())) { + currRange = null; + } - while (tabletIter.hasNext()) { + while (pi.hasNext()) { - if (currRange == null) { - if (!lmi.hasNext()) { - break; + if (currRange == null) { + if (!lmi.hasNext()) { + break; + } + currRange = lmi.next(); + lastTablet = currRange.getKey(); + } + // If the user set the TABLE_BULK_SKIP_THRESHOLD property, then only look + // at the next skipDistance tablets before recreating the iterator + if (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) && skip > 0 + && currRange.getKey().prevEndRow() != null) { + final KeyExtent search = currRange.getKey(); + if (!pi.findWithin((ke) -> Objects.equals(ke.prevEndRow(), search.prevEndRow()), skip)) { + log.debug( + "Tablet metadata for prevEndRow {} not found in {} tablets from current tablet {}, recreating TabletMetadata to jump ahead", + search.prevEndRow(), skip, currTablet); + tabletIterFactory.close(); + pi = new PeekingIterator<>(tabletIterFactory.newTabletIter(search.prevEndRow())); + currTablet = pi.next(); + } + } + while (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) && pi.hasNext()) { + currTablet = pi.next(); } - currRange = lmi.next(); - lastTablet = currRange.getKey(); - } - while (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) - && tabletIter.hasNext()) { - currTablet = tabletIter.next(); - } + boolean matchedPrevRow = equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()); - boolean matchedPrevRow = equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()); + if (matchedPrevRow && firstTablet == null) { + firstTablet = currTablet; + } - if (matchedPrevRow && firstTablet == null) { - firstTablet = currTablet; - } + count = matchedPrevRow ? 1 : 0; - count = matchedPrevRow ? 1 : 0; + while (!equals(KeyExtent::endRow, currTablet, currRange.getKey()) && pi.hasNext()) { + currTablet = pi.next(); + count++; + } - while (!equals(KeyExtent::endRow, currTablet, currRange.getKey()) && tabletIter.hasNext()) { - currTablet = tabletIter.next(); - count++; - } + if (!matchedPrevRow || !equals(KeyExtent::endRow, currTablet, currRange.getKey())) { + break; + } - if (!matchedPrevRow || !equals(KeyExtent::endRow, currTablet, currRange.getKey())) { - break; + if (maxNumTablets > 0) { + int fc = count; + currRange.getValue() + .forEach(fileInfo -> fileCounts.merge(fileInfo.getFileName(), fc, Integer::sum)); + } + currRange = null; } - if (maxNumTablets > 0) { - int fc = count; - currRange.getValue() - .forEach(fileInfo -> fileCounts.merge(fileInfo.getFileName(), fc, Integer::sum)); + if (currRange != null || lmi.hasNext()) { + // merge happened after the mapping was generated and before the table lock was acquired + throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, + TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge happened"); } - currRange = null; - } - - if (currRange != null || lmi.hasNext()) { - // merge happened after the mapping was generated and before the table lock was acquired - throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, - TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge happened"); - } - if (maxNumTablets > 0) { - fileCounts.values().removeIf(c -> c <= maxNumTablets); - if (!fileCounts.isEmpty()) { - throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, - TableOperationExceptionType.OTHER, "Files overlap the configured max (" + maxNumTablets - + ") number of tablets: " + new TreeMap<>(fileCounts)); + if (maxNumTablets > 0) { + fileCounts.values().removeIf(c -> c <= maxNumTablets); + if (!fileCounts.isEmpty()) { + throw new AcceptableThriftTableOperationException(tableId, null, + TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER, + "Files overlap the configured max (" + maxNumTablets + ") number of tablets: " + + new TreeMap<>(fileCounts)); + } } + return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), firstTablet.prevEndRow()); + } finally { + tabletIterFactory.close(); } - - return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), firstTablet.prevEndRow()); } private KeyExtent checkForMerge(final long tid, final Manager manager) throws Exception { @@ -205,13 +226,29 @@ public class PrepBulkImport extends ManagerRepo { try (LoadMappingIterator lmi = BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) { - TabletIterFactory tabletIterFactory = - startRow -> TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId) - .overlapping(startRow, null).checkConsistency().fetch(PREV_ROW).build().stream() - .map(TabletMetadata::getExtent).iterator(); + TabletIterFactory tabletIterFactory = new TabletIterFactory() { + + TabletsMetadata tm = null; + + @Override + public Iterator<KeyExtent> newTabletIter(Text startRow) { + tm = TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId) + .overlapping(startRow, null).checkConsistency().fetch(PREV_ROW).build(); + return tm.stream().map(TabletMetadata::getExtent).iterator(); + } + @Override + public void close() { + if (tm != null) { + tm.close(); + } + } + }; + + int skip = manager.getContext().getTableConfiguration(bulkInfo.tableId) + .getCount(Property.TABLE_BULK_SKIP_THRESHOLD); return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets, - tid); + tid, skip); } } @@ -224,7 +261,6 @@ public class PrepBulkImport extends ManagerRepo { Optional.ofNullable(tabletsRange.prevEndRow()).map(Text::getBytes).orElse(null); bulkInfo.lastSplit = Optional.ofNullable(tabletsRange.endRow()).map(Text::getBytes).orElse(null); - log.trace("{} first split:{} last split:{}", FateTxId.formatTid(tid), tabletsRange.prevEndRow(), tabletsRange.endRow()); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java index 9afa0ceb57..31d8cfd828 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java @@ -162,7 +162,7 @@ public class LoadFilesTest { Path bulkDir = EasyMock.createMock(Path.class); EasyMock.replay(manager, bulkDir); - LoadFiles.loadFiles(cl, info, bulkDir, lmi, tmf, manager, txid); + LoadFiles.loadFiles(cl, info, bulkDir, lmi, tmf, manager, txid, 0); EasyMock.verify(manager, bulkDir); List<CaptureLoader.LoadResult> results = cl.getLoadResults(); assertEquals(loadRanges.size(), results.size()); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java index 9ff5945e21..5fb2d8181f 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -88,29 +89,37 @@ public class PrepBulkImportTest { }).iterator(); } - private void runTest(List<KeyExtent> loadRanges, List<KeyExtent> tabletRanges) throws Exception { + private void runTest(List<KeyExtent> loadRanges, List<KeyExtent> tabletRanges, int skipDistance) + throws Exception { Map<KeyExtent,String> lrm = new HashMap<>(); loadRanges.forEach(e -> lrm.put(e, "f1 f2 f3")); - runTest(lrm, tabletRanges, 100); + runTest(lrm, tabletRanges, 100, skipDistance); } public void runTest(Map<KeyExtent,String> loadRanges, List<KeyExtent> tabletRanges, - int maxTablets) throws Exception { - TabletIterFactory tabletIterFactory = startRow -> { - int start = -1; - - if (startRow == null) { - start = 0; - } else { - for (int i = 0; i < tabletRanges.size(); i++) { - if (tabletRanges.get(i).contains(startRow)) { - start = i; - break; + int maxTablets, int skipDistance) throws Exception { + TabletIterFactory tabletIterFactory = new TabletIterFactory() { + + @Override + public Iterator<KeyExtent> newTabletIter(Text startRow) { + int start = -1; + + if (startRow == null) { + start = 0; + } else { + for (int i = 0; i < tabletRanges.size(); i++) { + if (tabletRanges.get(i).contains(startRow)) { + start = i; + break; + } } } + + return tabletRanges.subList(start, tabletRanges.size()).iterator(); } - return tabletRanges.subList(start, tabletRanges.size()).iterator(); + @Override + public void close() {} }; var sortedExtents = loadRanges.keySet().stream().sorted().collect(Collectors.toList()); @@ -120,8 +129,8 @@ public class PrepBulkImportTest { .map(Text::toString).orElse(null); try (LoadMappingIterator lmi = createLoadMappingIter(loadRanges)) { - var extent = - PrepBulkImport.validateLoadMapping("1", lmi, tabletIterFactory, maxTablets, 10001); + var extent = PrepBulkImport.validateLoadMapping("1", lmi, tabletIterFactory, maxTablets, + 10001, skipDistance); assertEquals(nke(minPrevEndRow, maxPrevEndRow), extent, loadRanges + " " + tabletRanges); } } @@ -154,23 +163,24 @@ public class PrepBulkImportTest { .collect(Collectors.joining(",")); } - public void runExceptionTest(List<KeyExtent> loadRanges, List<KeyExtent> tabletRanges) { + public void runExceptionTest(List<KeyExtent> loadRanges, List<KeyExtent> tabletRanges, + int skipDistance) { String message = "expected " + toRangeStrings(loadRanges) + " to fail against " + toRangeStrings(tabletRanges); assertThrows(AcceptableThriftTableOperationException.class, - () -> runTest(loadRanges, tabletRanges), message); + () -> runTest(loadRanges, tabletRanges, skipDistance), message); } @Test public void testSingleTablet() throws Exception { - runTest(Arrays.asList(nke(null, null)), Arrays.asList(nke(null, null))); + runTest(Arrays.asList(nke(null, null)), Arrays.asList(nke(null, null)), 0); for (List<KeyExtent> loadRanges : powerSet(nke(null, "b"), nke("b", "k"), nke("k", "r"), nke("r", null))) { if (loadRanges.isEmpty()) { continue; } - runExceptionTest(loadRanges, Arrays.asList(nke(null, null))); + runExceptionTest(loadRanges, Arrays.asList(nke(null, null)), 0); } } @@ -186,8 +196,8 @@ public class PrepBulkImportTest { List<String> requiredRows = List.of("b", "m", "r", "v"); for (Set<String> otherRows : Sets.powerSet(Set.of("a", "c", "q", "t", "x"))) { - runTest(loadRanges, - createExtents(Stream.concat(requiredRows.stream(), otherRows.stream()))); + runTest(loadRanges, createExtents(Stream.concat(requiredRows.stream(), otherRows.stream())), + 0); } } } @@ -217,14 +227,14 @@ public class PrepBulkImportTest { // test will all but one of the rows in the load mapping for (Set<String> otherRows : Sets.powerSet(Set.of("a", "c", "q", "t", "x"))) { runExceptionTest(loadRanges, - createExtents(Stream.concat(rows2.stream(), otherRows.stream()))); + createExtents(Stream.concat(rows2.stream(), otherRows.stream())), 0); } } if (rows.size() > 1) { // test with none of the rows in the load mapping for (Set<String> otherRows : Sets.powerSet(Set.of("a", "c", "q", "t", "x"))) { - runExceptionTest(loadRanges, createExtents(otherRows.stream())); + runExceptionTest(loadRanges, createExtents(otherRows.stream()), 0); } } } @@ -250,14 +260,14 @@ public class PrepBulkImportTest { int totalTablets = requiredRows.size() + otherRows.size() + 1; if (totalTablets > maxTablets) { - runTooManyTest(loadRanges, tablets, "{f2=" + totalTablets + "}", maxTablets); + runTooManyTest(loadRanges, tablets, "{f2=" + totalTablets + "}", maxTablets, 2); } else { - runTest(loadRanges, createExtents(tablets), maxTablets); + runTest(loadRanges, createExtents(tablets), maxTablets, 2); } } runTest(loadRanges, createExtents(Stream.concat(requiredRows.stream(), otherRows.stream())), - 0); + 0, 2); } loadRanges.clear(); @@ -267,20 +277,20 @@ public class PrepBulkImportTest { loadRanges.put(nke("re", "rz"), "f4"); runTooManyTest(loadRanges, Stream.of("ca", "cd", "cz", "e", "ma", "md", "mm", "re", "rz"), - "{f3=4}", 3); + "{f3=4}", 3, 2); runTooManyTest(loadRanges, Stream.of("b", "ca", "cd", "cz", "e", "ma", "md", "mm", "re", "rz"), - "{f3=4}", 3); + "{f3=4}", 3, 2); runTooManyTest(loadRanges, Stream.of("ca", "cd", "cz", "e", "ma", "md", "mm", "re", "rf", "rh", "rm", "rz"), - "{f3=4, f4=4}", 3); + "{f3=4, f4=4}", 3, 2); runTooManyTest(loadRanges, - Stream.of("ca", "cd", "cz", "e", "ma", "mm", "re", "rf", "rh", "rm", "rz"), "{f4=4}", 3); + Stream.of("ca", "cd", "cz", "e", "ma", "mm", "re", "rf", "rh", "rm", "rz"), "{f4=4}", 3, 2); } private void runTooManyTest(Map<KeyExtent,String> loadRanges, Stream<String> tablets, - String expectedMessage, int maxTablets) { + String expectedMessage, int maxTablets, int skipDistance) { var exception = assertThrows(ThriftTableOperationException.class, - () -> runTest(loadRanges, createExtents(tablets), maxTablets)); + () -> runTest(loadRanges, createExtents(tablets), maxTablets, skipDistance)); String message = exception.toString(); assertTrue(exception.toString().contains(expectedMessage), expectedMessage + " -- " + message); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index 683461b8c3..1cb6bbf953 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -169,7 +169,7 @@ public class BulkNewIT extends SharedMiniClusterBase { String dir = getDir("/testSingleTabletSingleFileNoSplits-"); - String h1 = writeData(dir + "/f1.", aconf, 0, 332); + String h1 = writeData(fs, dir + "/f1.", aconf, 0, 332); c.tableOperations().importDirectory(dir).to(tableName).tableTime(setTime).load(); // running again with ignoreEmptyDir set to true will not throw an exception @@ -258,7 +258,7 @@ public class BulkNewIT extends SharedMiniClusterBase { String dir = getDir("/testSingleTabletSingleFileNoSplits-"); - String h1 = writeData(dir + "/f1.", aconf, 0, 333); + String h1 = writeData(fs, dir + "/f1.", aconf, 0, 333); c.tableOperations().importDirectory(dir).to(tableName).load(); @@ -291,7 +291,7 @@ public class BulkNewIT extends SharedMiniClusterBase { String dir = getDir("/testBadPermissions-"); - writeData(dir + "/f1.", aconf, 0, 333); + writeData(fs, dir + "/f1.", aconf, 0, 333); Path rFilePath = new Path(dir, "f1." + RFile.EXTENSION); FsPermission originalPerms = fs.getFileStatus(rFilePath).getPermission(); @@ -339,21 +339,21 @@ public class BulkNewIT extends SharedMiniClusterBase { out.close(); // 1 Tablet 0333-null - String h1 = writeData(dir + "/f1.", aconf, 0, 333); + String h1 = writeData(fs, dir + "/f1.", aconf, 0, 333); hashes.get("0333").add(h1); // 2 Tablets 0666-0334, 0999-0667 - String h2 = writeData(dir + "/f2.", aconf, 334, 999); + String h2 = writeData(fs, dir + "/f2.", aconf, 334, 999); hashes.get("0666").add(h2); hashes.get("0999").add(h2); // 2 Tablets 1333-1000, 1666-1334 - String h3 = writeData(dir + "/f3.", aconf, 1000, 1499); + String h3 = writeData(fs, dir + "/f3.", aconf, 1000, 1499); hashes.get("1333").add(h3); hashes.get("1666").add(h3); // 2 Tablets 1666-1334, >1666 - String h4 = writeData(dir + "/f4.", aconf, 1500, 1999); + String h4 = writeData(fs, dir + "/f4.", aconf, 1500, 1999); hashes.get("1666").add(h4); hashes.get("null").add(h4); @@ -393,21 +393,21 @@ public class BulkNewIT extends SharedMiniClusterBase { out.close(); // 1 Tablet 0333-null - String h1 = writeData(dir + "/f1.", aconf, 0, 333); + String h1 = writeData(fs, dir + "/f1.", aconf, 0, 333); hashes.get("0333").add(h1); // 3 Tablets 0666-0334, 0999-0667, 1333-1000 - String h2 = writeData(dir + "/bad-file.", aconf, 334, 1333); + String h2 = writeData(fs, dir + "/bad-file.", aconf, 334, 1333); hashes.get("0666").add(h2); hashes.get("0999").add(h2); hashes.get("1333").add(h2); // 1 Tablet 1666-1334 - String h3 = writeData(dir + "/f3.", aconf, 1334, 1499); + String h3 = writeData(fs, dir + "/f3.", aconf, 1334, 1499); hashes.get("1666").add(h3); // 2 Tablets 1666-1334, >1666 - String h4 = writeData(dir + "/f4.", aconf, 1500, 1999); + String h4 = writeData(fs, dir + "/f4.", aconf, 1500, 1999); hashes.get("1666").add(h4); hashes.get("null").add(h4); @@ -453,8 +453,8 @@ public class BulkNewIT extends SharedMiniClusterBase { String dir = getDir("/testBulkFile-"); - writeData(dir + "/f1.", aconf, 0, 333); - writeData(dir + "/f2.", aconf, 0, 666); + writeData(fs, dir + "/f1.", aconf, 0, 333); + writeData(fs, dir + "/f2.", aconf, 0, 666); final var importMappingOptions = c.tableOperations().importDirectory(dir).to(tableName); @@ -487,12 +487,12 @@ public class BulkNewIT extends SharedMiniClusterBase { String dir = getDir("/testBulkFile-"); Map<String,Set<String>> hashes = new HashMap<>(); - String h1 = writeData(dir + "/f1.", aconf, 0, 333); + String h1 = writeData(fs, dir + "/f1.", aconf, 0, 333); hashes.put("0333", new HashSet<>(List.of(h1))); - String h2 = writeData(dir + "/f2.", aconf, 0, 666); + String h2 = writeData(fs, dir + "/f2.", aconf, 0, 666); hashes.get("0333").add(h2); hashes.put("0666", new HashSet<>(List.of(h2))); - String h3 = writeData(dir + "/f3.", aconf, 334, 700); + String h3 = writeData(fs, dir + "/f3.", aconf, 334, 700); hashes.get("0666").add(h3); hashes.put("0999", new HashSet<>(List.of(h3))); hashes.put("1333", Set.of()); @@ -571,7 +571,7 @@ public class BulkNewIT extends SharedMiniClusterBase { addSplits(c, tableName, "0333"); - var h1 = writeData(dir + "/f1.", aconf, 333, 333); + var h1 = writeData(fs, dir + "/f1.", aconf, 333, 333); c.tableOperations().importDirectory(dir).to(tableName).load(); @@ -593,7 +593,7 @@ public class BulkNewIT extends SharedMiniClusterBase { String dir = getDir("/testExceptionInMetadataUpdate-"); - String h1 = writeData(dir + "/f1.", aconf, 0, 333); + String h1 = writeData(fs, dir + "/f1.", aconf, 0, 333); var executor = Executors.newSingleThreadExecutor(); // With the constraint configured that makes tservers throw an exception on bulk import, the @@ -630,8 +630,8 @@ public class BulkNewIT extends SharedMiniClusterBase { client.tableOperations().addSplits(tableName, splits); } - private void verifyData(AccumuloClient client, String table, int start, int end, boolean setTime) - throws Exception { + private static void verifyData(AccumuloClient client, String table, int start, int end, + boolean setTime) throws Exception { try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { Iterator<Entry<Key,Value>> iter = scanner.iterator(); @@ -664,7 +664,7 @@ public class BulkNewIT extends SharedMiniClusterBase { } } - private void verifyMetadata(AccumuloClient client, String tableName, + public static void verifyMetadata(AccumuloClient client, String tableName, Map<String,Set<String>> expectedHashes) { Set<String> endRowsSeen = new HashSet<>(); @@ -691,7 +691,7 @@ public class BulkNewIT extends SharedMiniClusterBase { @SuppressFBWarnings(value = {"PATH_TRAVERSAL_IN", "WEAK_MESSAGE_DIGEST_SHA1"}, justification = "path provided by test; sha-1 is okay for test") - private String hash(String filename) { + public static String hash(String filename) { try { byte[] data = Files.readAllBytes(Paths.get(filename.replaceFirst("^file:", ""))); byte[] hash = MessageDigest.getInstance("SHA1").digest(data); @@ -701,13 +701,12 @@ public class BulkNewIT extends SharedMiniClusterBase { } } - private static String row(int r) { + public static String row(int r) { return String.format("%04d", r); } - private String writeData(String file, AccumuloConfiguration aconf, int s, int e) - throws Exception { - FileSystem fs = getCluster().getFileSystem(); + public static String writeData(FileSystem fs, String file, AccumuloConfiguration aconf, int s, + int e) throws Exception { String filename = file + RFile.EXTENSION; try (FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder() .forFile(filename, fs, fs.getConf(), NoCryptoServiceFactory.NONE) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewMetadataSkipIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewMetadataSkipIT.java new file mode 100644 index 0000000000..8f286d8f7c --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewMetadataSkipIT.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.test.functional.BulkNewIT.hash; +import static org.apache.accumulo.test.functional.BulkNewIT.row; +import static org.apache.accumulo.test.functional.BulkNewIT.verifyMetadata; +import static org.apache.accumulo.test.functional.BulkNewIT.writeData; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.IntStream; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.LoadPlan.RangeType; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.MemoryUnit; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +/** + * This test creates a table with 1000 splits and then imports files into a sparse set of the + * tablets. This test also splits the metadata table such that the tablet metadata for each tablet + * of the test table is in its own metadata tablet. The test then runs with different values for the + * TABLE_BULK_SKIP_THRESHOLD property starting with zero (disabled) then increasing. + * + * This test uses AccumuloClusterHarness instead of SharedMiniClusterBase so that we don't have to + * re-merge the metadata table and delete the test table. Doing these two things, and then waiting + * for balancing, takes a long time. It's faster to just start with a clean instance for each test + * run. + */ +public class BulkNewMetadataSkipIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setMemory(ServerType.TABLET_SERVER, 512, MemoryUnit.MEGABYTE); + cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "3s"); + cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "25"); + cfg.setNumTservers(1); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + public static String writeNonContiguousData(FileSystem fs, String file, + AccumuloConfiguration aconf, int[] rows) throws Exception { + String filename = file + RFile.EXTENSION; + try (FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder() + .forFile(filename, fs, fs.getConf(), NoCryptoServiceFactory.NONE) + .withTableConfiguration(aconf).build()) { + writer.startDefaultLocalityGroup(); + for (int i : rows) { + writer.append(new Key(new Text(row(i))), new Value(Integer.toString(i))); + } + } + return hash(filename); + } + + @BeforeEach + @Override + public void setupCluster() throws Exception { + super.setupCluster(); + // prime the zk connection + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {} + } + + @ParameterizedTest + @ValueSource(ints = {0, 0, 2, 4, 8, 16, 32, 64, 128}) + public void test(int skipDistance) throws Exception { + + final String tableName = getUniqueNames(1)[0] + "_" + skipDistance; + final AccumuloConfiguration aconf = getCluster().getServerContext().getConfiguration(); + final FileSystem fs = getCluster().getFileSystem(); + final String rootPath = getCluster().getTemporaryPath().toString(); + final String dir = rootPath + "/" + tableName; + + fs.delete(new Path(dir), true); + + final SortedSet<Text> splits = new TreeSet<>(); + IntStream.rangeClosed(0, 1000).forEach(i -> splits.add(new Text(String.format("%04d", i)))); + + final NewTableConfiguration ntc = new NewTableConfiguration(); + ntc.setProperties( + Map.of(Property.TABLE_BULK_SKIP_THRESHOLD.getKey(), Integer.toString(skipDistance))); + ntc.withSplits(splits); + + final Map<String,Set<String>> hashes = new HashMap<>(); + IntStream.rangeClosed(0, 1000).forEach(i -> hashes.put(row(i), new HashSet<>())); + hashes.put("null", new HashSet<>()); + + String h1 = writeData(fs, dir + "/f1.", aconf, 0, 11); + IntStream.rangeClosed(0, 11).forEach(i -> hashes.get(row(i)).add(h1)); + + int[] h2Rows = new int[] {11, 199, 200, 204}; + String h2 = writeNonContiguousData(fs, dir + "/f2.", aconf, h2Rows); + for (int i : h2Rows) { + hashes.get(row(i)).add(h2); + } + + int[] h3Rows = new int[] {13, 200, 272, 273}; + String h3 = writeNonContiguousData(fs, dir + "/f3.", aconf, h3Rows); + for (int i : h3Rows) { + hashes.get(row(i)).add(h3); + } + + int[] h4Rows = new int[] {300, 301, 672, 998}; + String h4 = writeNonContiguousData(fs, dir + "/f4.", aconf, h4Rows); + for (int i : h4Rows) { + hashes.get(row(i)).add(h4); + } + + final LoadPlan loadPlan = + LoadPlan.builder().loadFileTo("f1.rf", RangeType.FILE, row(0), row(11)) + .loadFileTo("f2.rf", RangeType.TABLE, row(10), row(11)) + .loadFileTo("f2.rf", RangeType.FILE, row(199), row(200)) + .loadFileTo("f2.rf", RangeType.TABLE, row(203), row(204)) + .loadFileTo("f3.rf", RangeType.TABLE, row(12), row(13)) + .loadFileTo("f3.rf", RangeType.TABLE, row(199), row(200)) + .loadFileTo("f3.rf", RangeType.FILE, row(272), row(273)) + .loadFileTo("f4.rf", RangeType.FILE, row(300), row(301)) + .loadFileTo("f4.rf", RangeType.TABLE, row(671), row(672)) + .loadFileTo("f4.rf", RangeType.TABLE, row(997), row(998)).build(); + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + c.tableOperations().create(tableName, ntc); + TableId tid = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + final SortedSet<Text> metadataSplits = new TreeSet<>(); + Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + final String mdTablePrefix = tid.canonical() + ";"; + s.forEach(e -> { + final String row = e.getKey().getRow().toString(); + if (row.startsWith(mdTablePrefix)) { + metadataSplits.add(new Text(row + "\\x00")); + } + }); + c.tableOperations().addSplits(MetadataTable.NAME, metadataSplits); + + c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load(); + + verifyData(c, tableName, new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 199, 200, 204, + 272, 273, 300, 301, 672, 998}, false); + verifyMetadata(c, tableName, hashes); + } + } + + public static void verifyData(AccumuloClient client, String table, int[] expectedRows, + boolean setTime) throws Exception { + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + int count = 0; + while (iter.hasNext()) { + Entry<Key,Value> entry = iter.next(); + + String expectedRow = String.format("%04d", expectedRows[count]); + + if (!entry.getKey().getRow().equals(new Text(expectedRow))) { + throw new Exception("unexpected row " + entry.getKey() + " " + expectedRow); + } + + if (Integer.parseInt(entry.getValue().toString()) != expectedRows[count]) { + throw new Exception("unexpected value " + entry + " " + expectedRows[count]); + } + + if (setTime) { + assertEquals(1L, entry.getKey().getTimestamp()); + } + + count++; + } + } + } + +}