This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 54e5758727b65af1f47b3beddfce6f48cbbcf202 Merge: ff07df3a5a 6d4c37b1d8 Author: Daniel Roberts ddanielr <[email protected]> AuthorDate: Wed Dec 31 21:49:03 2025 +0000 Merge branch '2.1' .../accumulo/core/client/rfile/RFileScanner.java | 15 ++++- .../accumulo/core/clientImpl/OfflineIterator.java | 8 ++- .../org/apache/accumulo/core/conf/Property.java | 3 + .../accumulo/core/file/rfile/GenerateSplits.java | 4 +- .../core/iteratorsImpl/system/MultiIterator.java | 35 +++++++---- .../system/MultiShuffledIterator.java | 54 ++++++++++++++++ .../core/iterators/system/MultiIteratorTest.java | 72 ++++++++++++++++++++-- .../system/MultiShuffledIteratorTest.java | 36 +++++++++++ .../org/apache/accumulo/server/fs/FileManager.java | 7 +++ .../iterators/SystemIteratorEnvironmentImpl.java | 2 +- .../accumulo/tserver/tablet/ScanDataSource.java | 14 ++++- .../test/performance/scan/CollectTabletStats.java | 2 +- 12 files changed, 226 insertions(+), 26 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index c6c88dd5cf,17edff8d2e..dd311d2eda --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -1040,14 -1198,9 +1040,17 @@@ public enum Property "The maximum amount of memory that will be used to cache results of a client query/scan. " + "Once this limit is reached, the buffered data is sent to the client.", "1.3.5"), + TABLE_BULK_MAX_TABLETS("table.bulk.max.tablets", "100", PropertyType.COUNT, + "The maximum number of tablets allowed for one bulk import file. Value of 0 is Unlimited.", + "2.1.0"), + TABLE_BULK_MAX_TABLET_FILES("table.bulk.max.tablet.files", "100", PropertyType.COUNT, + "The maximum number of files a bulk import can add to a single tablet. When this property " + + "is exceeded for any tablet the entire bulk import operation will fail before making any " + + "changes. Value of 0 is unlimited.", + "4.0.0"), + TABLE_SHUFFLE_SOURCES("table.shuffle.sources", "false", PropertyType.BOOLEAN, + "Shuffle the opening order for Rfiles to reduce thread contention on file open operations.", + "2.1.5"), TABLE_FILE_TYPE("table.file.type", RFile.EXTENSION, PropertyType.FILENAME_EXT, "Change the type of file a table writes.", "1.3.5"), TABLE_LOAD_BALANCER("table.balancer", "org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer", diff --cc core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java index 77cd69cf01,a7b7a76dae..b33067bdf6 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java @@@ -31,14 -31,13 +31,17 @@@ import org.apache.accumulo.core.data.Va import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; ++import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; ++ /** * An iterator capable of iterating over other iterators in sorted order. */ - public final class MultiIterator extends HeapIterator { ++@SuppressFBWarnings(value = "CT_CONSTRUCTOR_THROW", ++ justification = "Constructor validation is required for proper initialization") + public class MultiIterator extends HeapIterator { private List<SortedKeyValueIterator<Key,Value>> iters; - private Range fence; + private final Range fence; // deep copy with no seek/scan state @Override diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java index 10776cca2e,9a3aad33c3..1a3443fa96 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java @@@ -483,7 -500,11 +487,10 @@@ public class FileManager + maxOpen + " tablet = " + tablet); } + if (shuffleFiles) { + Collections.shuffle(files); + } - - Map<FileSKVIterator,String> newlyReservedReaders = + Map<FileSKVIterator,StoredTabletFile> newlyReservedReaders = reserveReaders(tablet, files, continueOnFailure, cacheProvider); tabletReservedReaders.addAll(newlyReservedReaders.keySet()); diff --cc server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java index ac10b553eb,3ef3a5fc9b..a01ca75ed6 --- a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java @@@ -125,12 -125,7 +125,12 @@@ public class SystemIteratorEnvironmentI } ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<>(topLevelIterators); allIters.add(iter); - return new MultiIterator(allIters, false); + return new MultiIterator(allIters); } + @Override + public boolean isRunningLowOnMemory() { + return getServerContext().getLowMemoryDetector().isRunningLowOnMemory(); + } + } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 82a9232d59,13c75157c6..d639c03e4a --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@@ -191,18 -193,24 +193,24 @@@ class ScanDataSource implements DataSou files = reservation.getSecond(); } - Collection<InterruptibleIterator> datafiles = - List<InterruptibleIterator> mapfiles = ++ List<InterruptibleIterator> datafiles = fileManager.openFiles(files, scanParams.isIsolated(), samplerConfig); - List.of(mapfiles, memIters).forEach(c -> c.forEach(ii -> ii.setInterruptFlag(interruptFlag))); + List.of(datafiles, memIters).forEach(c -> c.forEach(ii -> ii.setInterruptFlag(interruptFlag))); List<SortedKeyValueIterator<Key,Value>> iters = - new ArrayList<>(mapfiles.size() + memIters.size()); + new ArrayList<>(datafiles.size() + memIters.size()); - iters.addAll(mapfiles); + iters.addAll(datafiles); iters.addAll(memIters); - MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent()); + HeapIterator multiIter; + if (tablet.getContext().getTableConfiguration(tablet.getExtent().tableId()) + .getBoolean(Property.TABLE_SHUFFLE_SOURCES)) { + multiIter = new MultiShuffledIterator(iters, tablet.getExtent().toDataRange()); + } else { + multiIter = new MultiIterator(iters, tablet.getExtent().toDataRange()); + } var builder = new SystemIteratorEnvironmentImpl.Builder(tablet.getContext()) .withTopLevelIterators(new ArrayList<>()).withScope(IteratorScope.scan) diff --cc test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java index 286a3587f7,ae789e0d88..600d1906c7 --- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java @@@ -436,12 -433,12 +436,12 @@@ public class CollectTabletStats SortedMapIterator smi = new SortedMapIterator(new TreeMap<>()); - List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<>(mapfiles.size() + 1); + List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<>(dataFiles.size() + 1); - iters.addAll(mapfiles); + iters.addAll(dataFiles); iters.add(smi); - MultiIterator multiIter = new MultiIterator(iters, ke); + MultiIterator multiIter = new MultiIterator(iters, ke.toDataRange()); SortedKeyValueIterator<Key,Value> delIter = DeletingIterator.wrap(multiIter, false, Behavior.PROCESS); ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
