This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new a83f29ea1e Fixed MemoryStarvedScanIT (#5309)
a83f29ea1e is described below
commit a83f29ea1e95d709453875c4919ee7e6853ddb5c
Author: Dave Marion <[email protected]>
AuthorDate: Wed Feb 5 17:57:23 2025 -0500
Fixed MemoryStarvedScanIT (#5309)
The IT was passing for a while, but then started failing consistently.
Found that the MemoryFreeingIterator had an error in the way that it
was being executed. Modified that and fixed a couple of other issues.
---
.../test/functional/MemoryFreeingIterator.java | 18 ++++++++--
.../test/functional/MemoryStarvedScanIT.java | 40 +++++++++++++++-------
2 files changed, 43 insertions(+), 15 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
index 95e7e2e7a4..077033c1e0 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
@@ -18,9 +18,15 @@
*/
package org.apache.accumulo.test.functional;
-import static java.util.concurrent.TimeUnit.SECONDS;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.util.UtilWaitThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,15 +36,21 @@ public class MemoryFreeingIterator extends WrappingIterator
{
private static final Logger LOG =
LoggerFactory.getLogger(MemoryFreeingIterator.class);
+ @Override
@SuppressFBWarnings(value = "DM_GC", justification = "gc is okay for test")
- public MemoryFreeingIterator() throws InterruptedException {
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
LOG.info("Try to free consumed memory - will block until
isRunningLowOnMemory returns false.");
MemoryConsumingIterator.freeBuffers();
+ System.gc();
while (this.isRunningLowOnMemory()) {
+ LOG.info("Waiting for runningLowOnMemory to return false");
System.gc();
// wait for LowMemoryDetector to recognize the memory is free.
- Thread.sleep(SECONDS.toMillis(1));
+ UtilWaitThread.sleep(1000);
}
LOG.info("isRunningLowOnMemory returned false - memory available");
}
+
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index 8e4da36bb9..88289e6db9 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -45,7 +45,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -63,6 +63,8 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Iterables;
+
public class MemoryStarvedScanIT extends SharedMiniClusterBase {
public static class MemoryStarvedITConfiguration implements
MiniClusterConfigurationCallback {
@@ -177,10 +179,14 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
}
static void freeServerMemory(AccumuloClient client) throws Exception {
- // Instantiating this class on the TabletServer will free the memory as it
- // frees the buffers created by the MemoryConsumingIterator in its
constructor.
-
client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(),
- WrappingIterator.class.getName());
+ // Scan the metadata table as this is not prevented when the
+ // server is low on memory. Use the MemoryFreeingIterator as it
+ // will free the memory on init()
+ try (Scanner scanner =
client.createScanner(AccumuloTable.METADATA.tableName())) {
+ IteratorSetting is = new IteratorSetting(11,
MemoryFreeingIterator.class, Map.of());
+ scanner.addScanIterator(is);
+ var unused = Iterables.size(scanner); // consume the key/values
+ }
}
@Test
@@ -344,6 +350,7 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
// generate enough data so more than one batch is returned.
ReadWriteIT.ingest(client, 1000, 3, 10, 0, table);
+ to.flush(table, null, null, true);
try (BatchScanner dataConsumingScanner =
client.createBatchScanner(table);
Scanner memoryConsumingScanner = client.createScanner(table)) {
@@ -371,10 +378,19 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
// Wait for a batch to be returned
waitFor(() -> fetched.get() > 0, MINUTES.toMillis(5), 200);
- // This should block until the GarbageCollectionLogger runs and
notices that the
- // VM is low on memory.
- Iterator<Entry<Key,Value>> consumingIter =
memoryConsumingScanner.iterator();
- assertTrue(consumingIter.hasNext());
+ // Make sure memory is free before trying to consume memory
+ while (LOW_MEM_DETECTED.get() == 1) {
+ freeServerMemory(client);
+ Thread.sleep(5000);
+ }
+
+ // This should block until the LowMemoryDetector runs and notices that
the
+ // VM is low on memory. Creating the iterator should be enough to init
the
+ // MemoryConsumingIterator on the server side and call seek on it as it
+ // has the readahead threshold set. We can confirm that this is the
case
+ // by looking at the metrics after the call.
+ var unused = memoryConsumingScanner.iterator();
+ waitFor(() -> 1 == LOW_MEM_DETECTED.get());
// Grab the current paused count, the number of rows fetched by the
memoryConsumingScanner
// has not increased
@@ -419,11 +435,11 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
private boolean verifyBatchedStalled(final int currCount, final int
startCount,
final double paused, final double returned) {
- if (startCount == currCount && SCAN_START_DELAYED.doubleValue() > paused) {
+ if (startCount <= currCount && SCAN_START_DELAYED.doubleValue() > paused) {
LOG.debug("found expected pause because of low memory");
return true;
}
- if (startCount == currCount && SCAN_RETURNED_EARLY.doubleValue() >
returned) {
+ if (startCount <= currCount && SCAN_RETURNED_EARLY.doubleValue() >
returned) {
LOG.debug("found expected early return because of low memory");
return true;
}
@@ -506,7 +522,7 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
Thread.sleep(1500);
assertEquals(currentCount, fetched.get());
assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
- assertEquals(returned, SCAN_RETURNED_EARLY.doubleValue());
+ assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned);
// check across multiple low memory checks and metric updates that low
memory detected
// remains set