This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 6d4c37b1d8 Reduce thread contention by shuffling rfile order (#5998)
6d4c37b1d8 is described below
commit 6d4c37b1d81d4071a532a77f4f469b2279d91394
Author: Daniel Roberts <[email protected]>
AuthorDate: Tue Dec 30 16:02:40 2025 -0500
Reduce thread contention by shuffling rfile order (#5998)
* Shuffle the files for ServerSide iterators
Shuffle the mapFile iterators before the MultiIterator is created to
avoid block cache contention.
* Modified MultiIterator instead of ScanDataSource
Realized the changes could be moved down to MultiIterator so the
behavior was similiar for deepCopies as well as constructed
MultiIterators.
Added a test for deepCopy since one did not exist
* formatting change
* Update
core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java
Co-authored-by: Keith Turner <[email protected]>
* Adds shuffle prop and fileManager shuffling
Adds the table property for shuffling files.
Adds shuffling for files in the FileManager.
Moves the shuffling logic into a separate Iterator class and changes the
ScanDataSource code to select the specific iterator class.
* Clean up constructors for MuliIterator
Support shuffled iterators in GeneratedSplits and OfflineIterator
* fix formatting
* Remove duplicate test code
Removed the duplicate test code by extending the original test class
---------
Co-authored-by: Keith Turner <[email protected]>
---
.../accumulo/core/client/rfile/RFileScanner.java | 15 ++++-
.../accumulo/core/clientImpl/OfflineIterator.java | 8 ++-
.../org/apache/accumulo/core/conf/Property.java | 3 +
.../accumulo/core/file/rfile/GenerateSplits.java | 4 +-
.../core/iteratorsImpl/system/MultiIterator.java | 29 ++++++---
.../system/MultiShuffledIterator.java | 54 ++++++++++++++++
.../core/iterators/system/MultiIteratorTest.java | 72 ++++++++++++++++++++--
.../system/MultiShuffledIteratorTest.java | 36 +++++++++++
.../org/apache/accumulo/server/fs/FileManager.java | 8 +++
.../iterators/SystemIteratorEnvironmentImpl.java | 2 +-
.../accumulo/tserver/tablet/ScanDataSource.java | 14 ++++-
.../test/performance/scan/CollectTabletStats.java | 2 +-
12 files changed, 222 insertions(+), 25 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 9266a53e51..49e424b456 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -61,6 +61,7 @@ import
org.apache.accumulo.core.iteratorsImpl.ClientIteratorEnvironment;
import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder;
import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
@@ -286,10 +287,20 @@ class RFileScanner extends ScannerOptions implements
Scanner {
}
SortedKeyValueIterator<Key,Value> iterator;
+ boolean shuffled = tableConf.getBoolean(Property.TABLE_SHUFFLE_SOURCES);
+
if (opts.bounds != null) {
- iterator = new MultiIterator(readers, opts.bounds);
+ if (shuffled) {
+ iterator = new MultiShuffledIterator(readers, opts.bounds);
+ } else {
+ iterator = new MultiIterator(readers, opts.bounds);
+ }
} else {
- iterator = new MultiIterator(readers, false);
+ if (shuffled) {
+ iterator = new MultiShuffledIterator(readers);
+ } else {
+ iterator = new MultiIterator(readers);
+ }
}
Set<ByteSequence> families = Collections.emptySet();
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
index 7ec99bd9c4..251523b445 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
@@ -56,6 +56,7 @@ import
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.ClientIteratorEnvironment;
import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -243,7 +244,12 @@ class OfflineIterator implements
Iterator<Entry<Key,Value>> {
readers.add(reader);
}
- MultiIterator multiIter = new MultiIterator(readers, extent);
+ MultiIterator multiIter;
+ if (tableCC.getBoolean(Property.TABLE_SHUFFLE_SOURCES)) {
+ multiIter = new MultiShuffledIterator(readers, extent.toDataRange());
+ } else {
+ multiIter = new MultiIterator(readers, extent.toDataRange());
+ }
ClientIteratorEnvironment.Builder iterEnvBuilder =
new
ClientIteratorEnvironment.Builder().withAuthorizations(authorizations)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index fa6bc2170c..17edff8d2e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1198,6 +1198,9 @@ public enum Property {
"The maximum amount of memory that will be used to cache results of a
client query/scan. "
+ "Once this limit is reached, the buffered data is sent to the
client.",
"1.3.5"),
+ TABLE_SHUFFLE_SOURCES("table.shuffle.sources", "false", PropertyType.BOOLEAN,
+ "Shuffle the opening order for Rfiles to reduce thread contention on
file open operations.",
+ "2.1.5"),
TABLE_FILE_TYPE("table.file.type", RFile.EXTENSION,
PropertyType.FILENAME_EXT,
"Change the type of file a table writes.", "1.3.5"),
TABLE_LOAD_BALANCER("table.balancer",
"org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer",
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
index 865210a970..3937ca383b 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
@@ -338,7 +338,7 @@ public class GenerateSplits implements KeywordExecutable {
readers.add(reader);
fileReaders.add(reader);
}
- iterator = new MultiIterator(readers, false);
+ iterator = new MultiIterator(readers);
iterator.seek(new Range(), Collections.emptySet(), false);
splitArray = getQuantiles(iterator, numSplits);
} finally {
@@ -372,7 +372,7 @@ public class GenerateSplits implements KeywordExecutable {
readers.add(reader);
fileReaders.add(reader);
}
- iterator = new MultiIterator(readers, false);
+ iterator = new MultiIterator(readers);
iterator.seek(new Range(), Collections.emptySet(), false);
while (iterator.hasTop()) {
Key key = iterator.getTopKey();
diff --git
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java
index 3b3defc546..a7b7a76dae 100644
---
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java
@@ -28,7 +28,6 @@ import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -37,7 +36,7 @@ import
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
*/
public class MultiIterator extends HeapIterator {
- private final List<SortedKeyValueIterator<Key,Value>> iters;
+ private List<SortedKeyValueIterator<Key,Value>> iters;
private final Range fence;
// deep copy with no seek/scan state
@@ -48,11 +47,16 @@ public class MultiIterator extends HeapIterator {
private MultiIterator(MultiIterator other, IteratorEnvironment env) {
super(other.iters.size());
- this.iters = new ArrayList<>();
+ var tmpIters = new ArrayList<SortedKeyValueIterator<Key,Value>>();
this.fence = other.fence;
for (SortedKeyValueIterator<Key,Value> iter : other.iters) {
- iters.add(iter.deepCopy(env));
+ tmpIters.add(iter.deepCopy(env));
}
+ setIters(tmpIters);
+ }
+
+ protected void setIters(List<SortedKeyValueIterator<Key,Value>> iters) {
+ this.iters = iters;
}
private void init() {
@@ -67,25 +71,30 @@ public class MultiIterator extends HeapIterator {
if (seekFence != null && init) {
// throw this exception because multi-iterator does not seek on init,
therefore the
- // fence would not be enforced in anyway, so do not want to give the
impression it
+ // fence would not be enforced in any way, so do not want to give the
impression it
// will enforce this
throw new IllegalArgumentException("Initializing not supported when seek
fence set");
}
this.fence = seekFence;
- this.iters = iters;
+ setIters(iters);
if (init) {
init();
}
}
- public MultiIterator(List<SortedKeyValueIterator<Key,Value>> iters, Range
seekFence) {
- this(iters, seekFence, false);
+ /**
+ * Creates a MultiIterator that doesn't have a fence range and therefore
doesn't seek on creation.
+ *
+ * @param iters List of source iterators
+ */
+ public MultiIterator(List<SortedKeyValueIterator<Key,Value>> iters) {
+ this(iters, null, false);
}
- public MultiIterator(List<SortedKeyValueIterator<Key,Value>> iters2,
KeyExtent extent) {
- this(iters2, new Range(extent.prevEndRow(), false, extent.endRow(), true),
false);
+ public MultiIterator(List<SortedKeyValueIterator<Key,Value>> iters, Range
seekFence) {
+ this(iters, seekFence, false);
}
public MultiIterator(List<SortedKeyValueIterator<Key,Value>> readers,
boolean init) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java
new file mode 100644
index 0000000000..e563c5467e
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.iteratorsImpl.system;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * An iterator capable of iterating over other iterators in sorted order while
shuffling the initial
+ * seek ordering to avoid thread contention.
+ */
+public class MultiShuffledIterator extends MultiIterator {
+
+ public MultiShuffledIterator(List<SortedKeyValueIterator<Key,Value>>
readers) {
+ super(readers);
+ }
+
+ public MultiShuffledIterator(List<SortedKeyValueIterator<Key,Value>> iters,
Range seekFence) {
+ super(iters, seekFence);
+ }
+
+ public MultiShuffledIterator(List<SortedKeyValueIterator<Key,Value>>
readers, boolean init) {
+ super(readers, init);
+ }
+
+ @Override
+ protected void setIters(List<SortedKeyValueIterator<Key,Value>> iters) {
+ var copy = new ArrayList<>(iters);
+ Collections.shuffle(copy);
+ super.setIters(copy);
+ }
+}
diff --git
a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java
b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java
index c9a52242dc..d669565c01 100644
---
a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java
@@ -45,6 +45,14 @@ public class MultiIteratorTest {
private static final Collection<ByteSequence> EMPTY_COL_FAMS = new
ArrayList<>();
+ protected MultiIterator makeIterator(List<SortedKeyValueIterator<Key,Value>>
list, Range range) {
+ return new MultiIterator(list, range);
+ }
+
+ protected MultiIterator makeIterator(List<SortedKeyValueIterator<Key,Value>>
list, Boolean init) {
+ return new MultiIterator(list, init);
+ }
+
public static Key newKey(int row, long ts) {
return new Key(newRow(row), ts);
}
@@ -74,7 +82,7 @@ public class MultiIteratorTest {
MultiIterator mi;
if (endRow == null && prevEndRow == null) {
- mi = new MultiIterator(iters, init);
+ mi = makeIterator(iters, init);
} else {
Range range = new Range(prevEndRow, false, endRow, true);
if (init) {
@@ -82,7 +90,7 @@ public class MultiIteratorTest {
iter.seek(range, Set.of(), false);
}
}
- mi = new MultiIterator(iters, range);
+ mi = makeIterator(iters, range);
if (init) {
mi.seek(range, Set.of(), false);
@@ -204,7 +212,7 @@ public class MultiIteratorTest {
List<SortedKeyValueIterator<Key,Value>> skvil = new ArrayList<>(1);
skvil.add(new SortedMapIterator(tm1));
- MultiIterator mi = new MultiIterator(skvil, true);
+ MultiIterator mi = makeIterator(skvil, true);
assertFalse(mi.hasTop());
@@ -285,7 +293,7 @@ public class MultiIteratorTest {
List<SortedKeyValueIterator<Key,Value>> skvil = new ArrayList<>(1);
skvil.add(new SortedMapIterator(tm1));
- MultiIterator mi = new MultiIterator(skvil, true);
+ MultiIterator mi = makeIterator(skvil, true);
mi.seek(new Range(null, true, newKey(5, 9), false), EMPTY_COL_FAMS, false);
assertTrue(mi.hasTop());
@@ -368,7 +376,7 @@ public class MultiIteratorTest {
KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1),
newRow(0));
- MultiIterator mi = new MultiIterator(skvil, extent);
+ MultiIterator mi = makeIterator(skvil, extent.toDataRange());
Range r1 = new Range((Text) null, (Text) null);
mi.seek(r1, EMPTY_COL_FAMS, false);
@@ -422,4 +430,58 @@ public class MultiIteratorTest {
mi.seek(r7, EMPTY_COL_FAMS, false);
assertFalse(mi.hasTop());
}
+
+ @Test
+ public void testDeepCopy() throws IOException {
+ // TEst setting an endKey
+ TreeMap<Key,Value> tm1 = new TreeMap<>();
+ newKeyValue(tm1, 0, 3, false, "1");
+ newKeyValue(tm1, 0, 2, false, "2");
+ newKeyValue(tm1, 0, 1, false, "3");
+ newKeyValue(tm1, 0, 0, false, "4");
+ newKeyValue(tm1, 1, 2, false, "5");
+ newKeyValue(tm1, 1, 1, false, "6");
+ newKeyValue(tm1, 1, 0, false, "7");
+ newKeyValue(tm1, 2, 1, false, "8");
+ newKeyValue(tm1, 2, 0, false, "9");
+
+ List<SortedKeyValueIterator<Key,Value>> skvil = new ArrayList<>(1);
+ skvil.add(new SortedMapIterator(tm1));
+
+ KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1),
newRow(0));
+
+ MultiIterator mi = makeIterator(skvil, extent.toDataRange());
+ MultiIterator miCopy = mi.deepCopy(null);
+
+ Range r1 = new Range((Text) null, null);
+ mi.seek(r1, EMPTY_COL_FAMS, false);
+ miCopy.seek(r1, EMPTY_COL_FAMS, false);
+ assertTrue(mi.hasTop());
+ assertEquals("5", mi.getTopValue().toString());
+ mi.next();
+ assertTrue(mi.hasTop());
+ assertEquals("6", mi.getTopValue().toString());
+ assertTrue(miCopy.hasTop());
+ assertEquals("5", miCopy.getTopValue().toString());
+ mi.next();
+ assertTrue(mi.hasTop());
+ assertEquals("7", mi.getTopValue().toString());
+ mi.next();
+ assertFalse(mi.hasTop());
+ assertEquals("5", miCopy.getTopValue().toString());
+
+ miCopy.seek(r1, EMPTY_COL_FAMS, false);
+
+ assertTrue(miCopy.hasTop());
+ assertEquals("5", miCopy.getTopValue().toString());
+ miCopy.next();
+ assertTrue(miCopy.hasTop());
+ assertEquals("6", miCopy.getTopValue().toString());
+ assertFalse(mi.hasTop());
+ miCopy.next();
+ assertTrue(miCopy.hasTop());
+ assertEquals("7", miCopy.getTopValue().toString());
+ miCopy.next();
+ assertFalse(miCopy.hasTop());
+ }
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java
b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java
new file mode 100644
index 0000000000..50ab6fae5d
--- /dev/null
+++
b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.iterators.system;
+
+import java.util.List;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator;
+
+public class MultiShuffledIteratorTest extends MultiIteratorTest {
+
+ @Override
+ protected MultiIterator makeIterator(List<SortedKeyValueIterator<Key,Value>>
list, Range range) {
+ return new MultiShuffledIterator(list, range);
+ }
+}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
index 23ebfd1e23..9a3aad33c3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
@@ -469,6 +469,7 @@ public class FileManager {
private final KeyExtent tablet;
private boolean continueOnFailure;
private final CacheProvider cacheProvider;
+ private final boolean shuffleFiles;
ScanFileManager(KeyExtent tablet, CacheProvider cacheProvider) {
tabletReservedReaders = new ArrayList<>();
@@ -479,6 +480,9 @@ public class FileManager {
continueOnFailure = context.getTableConfiguration(tablet.tableId())
.getBoolean(Property.TABLE_FAILURES_IGNORE);
+ shuffleFiles = context.getTableConfiguration(tablet.tableId())
+ .getBoolean(Property.TABLE_SHUFFLE_SOURCES);
+
if (tablet.isMeta()) {
continueOnFailure = false;
}
@@ -496,6 +500,10 @@ public class FileManager {
+ maxOpen + " tablet = " + tablet);
}
+ if (shuffleFiles) {
+ Collections.shuffle(files);
+ }
+
Map<FileSKVIterator,String> newlyReservedReaders =
reserveReaders(tablet, files, continueOnFailure, cacheProvider);
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java
index 2705703ad6..3ef3a5fc9b 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java
@@ -125,7 +125,7 @@ public class SystemIteratorEnvironmentImpl extends
ClientIteratorEnvironment
}
ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new
ArrayList<>(topLevelIterators);
allIters.add(iter);
- return new MultiIterator(allIters, false);
+ return new MultiIterator(allIters);
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index 1e6c408ee5..13c75157c6 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -20,13 +20,13 @@ package org.apache.accumulo.tserver.tablet;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
@@ -35,9 +35,11 @@ import
org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder;
import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
+import org.apache.accumulo.core.iteratorsImpl.system.HeapIterator;
import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
import
org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator;
import
org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator.DataSource;
import org.apache.accumulo.core.iteratorsImpl.system.StatsIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
@@ -191,7 +193,7 @@ class ScanDataSource implements DataSource {
files = reservation.getSecond();
}
- Collection<InterruptibleIterator> mapfiles =
+ List<InterruptibleIterator> mapfiles =
fileManager.openFiles(files, scanParams.isIsolated(), samplerConfig);
List.of(mapfiles, memIters).forEach(c -> c.forEach(ii ->
ii.setInterruptFlag(interruptFlag)));
@@ -202,7 +204,13 @@ class ScanDataSource implements DataSource {
iters.addAll(mapfiles);
iters.addAll(memIters);
- MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent());
+ HeapIterator multiIter;
+ if (tablet.getContext().getTableConfiguration(tablet.getExtent().tableId())
+ .getBoolean(Property.TABLE_SHUFFLE_SOURCES)) {
+ multiIter = new MultiShuffledIterator(iters,
tablet.getExtent().toDataRange());
+ } else {
+ multiIter = new MultiIterator(iters, tablet.getExtent().toDataRange());
+ }
var builder = new
SystemIteratorEnvironmentImpl.Builder(tablet.getContext())
.withTopLevelIterators(new ArrayList<>()).withScope(IteratorScope.scan)
diff --git
a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index 62e034bbef..ae789e0d88 100644
---
a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++
b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -438,7 +438,7 @@ public class CollectTabletStats {
iters.addAll(mapfiles);
iters.add(smi);
- MultiIterator multiIter = new MultiIterator(iters, ke);
+ MultiIterator multiIter = new MultiIterator(iters, ke.toDataRange());
SortedKeyValueIterator<Key,Value> delIter =
DeletingIterator.wrap(multiIter, false, Behavior.PROCESS);
ColumnFamilySkippingIterator cfsi = new
ColumnFamilySkippingIterator(delIter);