This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push: new a83f29ea1e Fixed MemoryStarvedScanIT (#5309) a83f29ea1e is described below commit a83f29ea1e95d709453875c4919ee7e6853ddb5c Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Feb 5 17:57:23 2025 -0500 Fixed MemoryStarvedScanIT (#5309) The IT was passing for a while, but then started failing consistently. Found that the MemoryFreeingIterator had an error in the way that it was being executed. Modified that and fixed a couple of other issues. --- .../test/functional/MemoryFreeingIterator.java | 18 ++++++++-- .../test/functional/MemoryStarvedScanIT.java | 40 +++++++++++++++------- 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java index 95e7e2e7a4..077033c1e0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java @@ -18,9 +18,15 @@ */ package org.apache.accumulo.test.functional; -import static java.util.concurrent.TimeUnit.SECONDS; +import java.io.IOException; +import java.util.Map; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.util.UtilWaitThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,15 +36,21 @@ public class MemoryFreeingIterator extends WrappingIterator { private static final Logger LOG = LoggerFactory.getLogger(MemoryFreeingIterator.class); + @Override @SuppressFBWarnings(value = "DM_GC", justification = "gc is okay for test") - public MemoryFreeingIterator() throws InterruptedException { + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + IteratorEnvironment env) throws IOException { + super.init(source, options, env); LOG.info("Try to free consumed memory - will block until isRunningLowOnMemory returns false."); MemoryConsumingIterator.freeBuffers(); + System.gc(); while (this.isRunningLowOnMemory()) { + LOG.info("Waiting for runningLowOnMemory to return false"); System.gc(); // wait for LowMemoryDetector to recognize the memory is free. - Thread.sleep(SECONDS.toMillis(1)); + UtilWaitThread.sleep(1000); } LOG.info("isRunningLowOnMemory returned false - memory available"); } + } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java index 8e4da36bb9..88289e6db9 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java @@ -45,7 +45,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -63,6 +63,8 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Iterables; + public class MemoryStarvedScanIT extends SharedMiniClusterBase { public static class MemoryStarvedITConfiguration implements MiniClusterConfigurationCallback { @@ -177,10 +179,14 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { } static void freeServerMemory(AccumuloClient client) throws Exception { - // Instantiating this class on the TabletServer will free the memory as it - // frees the buffers created by the MemoryConsumingIterator in its constructor. - client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(), - WrappingIterator.class.getName()); + // Scan the metadata table as this is not prevented when the + // server is low on memory. Use the MemoryFreeingIterator as it + // will free the memory on init() + try (Scanner scanner = client.createScanner(AccumuloTable.METADATA.tableName())) { + IteratorSetting is = new IteratorSetting(11, MemoryFreeingIterator.class, Map.of()); + scanner.addScanIterator(is); + var unused = Iterables.size(scanner); // consume the key/values + } } @Test @@ -344,6 +350,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { // generate enough data so more than one batch is returned. ReadWriteIT.ingest(client, 1000, 3, 10, 0, table); + to.flush(table, null, null, true); try (BatchScanner dataConsumingScanner = client.createBatchScanner(table); Scanner memoryConsumingScanner = client.createScanner(table)) { @@ -371,10 +378,19 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { // Wait for a batch to be returned waitFor(() -> fetched.get() > 0, MINUTES.toMillis(5), 200); - // This should block until the GarbageCollectionLogger runs and notices that the - // VM is low on memory. - Iterator<Entry<Key,Value>> consumingIter = memoryConsumingScanner.iterator(); - assertTrue(consumingIter.hasNext()); + // Make sure memory is free before trying to consume memory + while (LOW_MEM_DETECTED.get() == 1) { + freeServerMemory(client); + Thread.sleep(5000); + } + + // This should block until the LowMemoryDetector runs and notices that the + // VM is low on memory. Creating the iterator should be enough to init the + // MemoryConsumingIterator on the server side and call seek on it as it + // has the readahead threshold set. We can confirm that this is the case + // by looking at the metrics after the call. + var unused = memoryConsumingScanner.iterator(); + waitFor(() -> 1 == LOW_MEM_DETECTED.get()); // Grab the current paused count, the number of rows fetched by the memoryConsumingScanner // has not increased @@ -419,11 +435,11 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { private boolean verifyBatchedStalled(final int currCount, final int startCount, final double paused, final double returned) { - if (startCount == currCount && SCAN_START_DELAYED.doubleValue() > paused) { + if (startCount <= currCount && SCAN_START_DELAYED.doubleValue() > paused) { LOG.debug("found expected pause because of low memory"); return true; } - if (startCount == currCount && SCAN_RETURNED_EARLY.doubleValue() > returned) { + if (startCount <= currCount && SCAN_RETURNED_EARLY.doubleValue() > returned) { LOG.debug("found expected early return because of low memory"); return true; } @@ -506,7 +522,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { Thread.sleep(1500); assertEquals(currentCount, fetched.get()); assertTrue(SCAN_START_DELAYED.doubleValue() >= paused); - assertEquals(returned, SCAN_RETURNED_EARLY.doubleValue()); + assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned); // check across multiple low memory checks and metric updates that low memory detected // remains set