This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new a83f29ea1e Fixed MemoryStarvedScanIT (#5309)
a83f29ea1e is described below

commit a83f29ea1e95d709453875c4919ee7e6853ddb5c
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Feb 5 17:57:23 2025 -0500

    Fixed MemoryStarvedScanIT (#5309)
    
    The IT was passing for a while, but then started failing consistently.
    Found that the MemoryFreeingIterator had an error in the way that it
    was being executed. Modified that and fixed a couple of other issues.
---
 .../test/functional/MemoryFreeingIterator.java     | 18 ++++++++--
 .../test/functional/MemoryStarvedScanIT.java       | 40 +++++++++++++++-------
 2 files changed, 43 insertions(+), 15 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
index 95e7e2e7a4..077033c1e0 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
@@ -18,9 +18,15 @@
  */
 package org.apache.accumulo.test.functional;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
+import java.io.IOException;
+import java.util.Map;
 
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.util.UtilWaitThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,15 +36,21 @@ public class MemoryFreeingIterator extends WrappingIterator 
{
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MemoryFreeingIterator.class);
 
+  @Override
   @SuppressFBWarnings(value = "DM_GC", justification = "gc is okay for test")
-  public MemoryFreeingIterator() throws InterruptedException {
+  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
     LOG.info("Try to free consumed memory - will block until 
isRunningLowOnMemory returns false.");
     MemoryConsumingIterator.freeBuffers();
+    System.gc();
     while (this.isRunningLowOnMemory()) {
+      LOG.info("Waiting for runningLowOnMemory to return false");
       System.gc();
       // wait for LowMemoryDetector to recognize the memory is free.
-      Thread.sleep(SECONDS.toMillis(1));
+      UtilWaitThread.sleep(1000);
     }
     LOG.info("isRunningLowOnMemory returned false - memory available");
   }
+
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index 8e4da36bb9..88289e6db9 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -45,7 +45,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -63,6 +63,8 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterables;
+
 public class MemoryStarvedScanIT extends SharedMiniClusterBase {
 
   public static class MemoryStarvedITConfiguration implements 
MiniClusterConfigurationCallback {
@@ -177,10 +179,14 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
   }
 
   static void freeServerMemory(AccumuloClient client) throws Exception {
-    // Instantiating this class on the TabletServer will free the memory as it
-    // frees the buffers created by the MemoryConsumingIterator in its 
constructor.
-    
client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(),
-        WrappingIterator.class.getName());
+    // Scan the metadata table as this is not prevented when the
+    // server is low on memory. Use the MemoryFreeingIterator as it
+    // will free the memory on init()
+    try (Scanner scanner = 
client.createScanner(AccumuloTable.METADATA.tableName())) {
+      IteratorSetting is = new IteratorSetting(11, 
MemoryFreeingIterator.class, Map.of());
+      scanner.addScanIterator(is);
+      var unused = Iterables.size(scanner); // consume the key/values
+    }
   }
 
   @Test
@@ -344,6 +350,7 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
 
       // generate enough data so more than one batch is returned.
       ReadWriteIT.ingest(client, 1000, 3, 10, 0, table);
+      to.flush(table, null, null, true);
 
       try (BatchScanner dataConsumingScanner = 
client.createBatchScanner(table);
           Scanner memoryConsumingScanner = client.createScanner(table)) {
@@ -371,10 +378,19 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
         // Wait for a batch to be returned
         waitFor(() -> fetched.get() > 0, MINUTES.toMillis(5), 200);
 
-        // This should block until the GarbageCollectionLogger runs and 
notices that the
-        // VM is low on memory.
-        Iterator<Entry<Key,Value>> consumingIter = 
memoryConsumingScanner.iterator();
-        assertTrue(consumingIter.hasNext());
+        // Make sure memory is free before trying to consume memory
+        while (LOW_MEM_DETECTED.get() == 1) {
+          freeServerMemory(client);
+          Thread.sleep(5000);
+        }
+
+        // This should block until the LowMemoryDetector runs and notices that 
the
+        // VM is low on memory. Creating the iterator should be enough to init 
the
+        // MemoryConsumingIterator on the server side and call seek on it as it
+        // has the readahead threshold set. We can confirm that this is the 
case
+        // by looking at the metrics after the call.
+        var unused = memoryConsumingScanner.iterator();
+        waitFor(() -> 1 == LOW_MEM_DETECTED.get());
 
         // Grab the current paused count, the number of rows fetched by the 
memoryConsumingScanner
         // has not increased
@@ -419,11 +435,11 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
 
   private boolean verifyBatchedStalled(final int currCount, final int 
startCount,
       final double paused, final double returned) {
-    if (startCount == currCount && SCAN_START_DELAYED.doubleValue() > paused) {
+    if (startCount <= currCount && SCAN_START_DELAYED.doubleValue() > paused) {
       LOG.debug("found expected pause because of low memory");
       return true;
     }
-    if (startCount == currCount && SCAN_RETURNED_EARLY.doubleValue() > 
returned) {
+    if (startCount <= currCount && SCAN_RETURNED_EARLY.doubleValue() > 
returned) {
       LOG.debug("found expected early return because of low memory");
       return true;
     }
@@ -506,7 +522,7 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
         Thread.sleep(1500);
         assertEquals(currentCount, fetched.get());
         assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
-        assertEquals(returned, SCAN_RETURNED_EARLY.doubleValue());
+        assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned);
 
         // check across multiple low memory checks and metric updates that low 
memory detected
         // remains set

Reply via email to