This is an automated email from the ASF dual-hosted git repository. edcoleman pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new e1320df709 fix low memory detector and test updates (#3508) e1320df709 is described below commit e1320df709630be844bddd9fd0d8c6045bc5111e Author: EdColeman <d...@etcoleman.com> AuthorDate: Fri Jun 23 20:28:53 2023 +0000 fix low memory detector and test updates (#3508) * fix low memory detector and test updates Co-authored-by: Dave Marion <dlmar...@apache.org> --- .../accumulo/server/mem/LowMemoryDetector.java | 79 ++++----- .../test/functional/MemoryFreeingIterator.java | 30 ++-- .../test/functional/MemoryStarvedMajCIT.java | 8 +- .../test/functional/MemoryStarvedMinCIT.java | 2 +- .../test/functional/MemoryStarvedScanIT.java | 192 ++++++++++++++++----- 5 files changed, 203 insertions(+), 108 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java index e08eed772b..fb18c7c682 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java +++ b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java @@ -36,20 +36,21 @@ import org.slf4j.LoggerFactory; public class LowMemoryDetector { + private static final Logger LOG = LoggerFactory.getLogger(LowMemoryDetector.class); + @FunctionalInterface - public static interface Action { + public interface Action { void execute(); } public enum DetectionScope { MINC, MAJC, SCAN - }; - - private static final Logger log = LoggerFactory.getLogger(LowMemoryDetector.class); + } private final HashMap<String,Long> prevGcTime = new HashMap<>(); + private long lastMemorySize = 0; - private long gcTimeIncreasedCount = 0; + private int lowMemCount = 0; private long lastMemoryCheckTime = 0; private final Lock memCheckTimeLock = new ReentrantLock(); private volatile boolean runningLowOnMemory = false; @@ -65,15 +66,15 @@ public class LowMemoryDetector { /** * @param context server context * @param scope whether this is being checked in the context of scan or compact code - * @param isUserTable boolean as to whether the table being scanned / compacted is a user table. - * No action is taken for system tables. + * @param isUserTable boolean set true if the table being scanned / compacted is a user table. No + * action is taken for system tables. * @param action Action to perform when this method returns true * @return true if server running low on memory */ public boolean isRunningLowOnMemory(ServerContext context, DetectionScope scope, Supplier<Boolean> isUserTable, Action action) { if (isUserTable.get()) { - Property p = null; + Property p; switch (scope) { case SCAN: p = Property.GENERAL_LOW_MEM_SCAN_PROTECTION; @@ -99,21 +100,19 @@ public class LowMemoryDetector { public void logGCInfo(AccumuloConfiguration conf) { - Double freeMemoryPercentage = conf.getFraction(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD); + double freeMemoryPercentage = conf.getFraction(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD); memCheckTimeLock.lock(); try { final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); - Runtime rt = Runtime.getRuntime(); StringBuilder sb = new StringBuilder("gc"); boolean sawChange = false; long maxIncreaseInCollectionTime = 0; - for (GarbageCollectorMXBean gcBean : gcmBeans) { Long prevTime = prevGcTime.get(gcBean.getName()); long pt = 0; @@ -135,48 +134,50 @@ public class LowMemoryDetector { prevGcTime.put(gcBean.getName(), time); } - long mem = rt.freeMemory(); - if (maxIncreaseInCollectionTime == 0) { - gcTimeIncreasedCount = 0; - } else { - gcTimeIncreasedCount++; - if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * freeMemoryPercentage) { + Runtime rt = Runtime.getRuntime(); + final long maxConfiguredMemory = rt.maxMemory(); + final long allocatedMemory = rt.totalMemory(); + final long allocatedFreeMemory = rt.freeMemory(); + final long freeMemory = maxConfiguredMemory - (allocatedMemory - allocatedFreeMemory); + final long lowMemoryThreshold = (long) (maxConfiguredMemory * freeMemoryPercentage); + LOG.trace("Memory info: max={}, allocated={}, free={}, free threshold={}", + maxConfiguredMemory, allocatedMemory, freeMemory, lowMemoryThreshold); + + if (freeMemory < lowMemoryThreshold) { + lowMemCount++; + if (lowMemCount > 3 && !runningLowOnMemory) { runningLowOnMemory = true; - log.warn("Running low on memory"); - gcTimeIncreasedCount = 0; + LOG.warn("Running low on memory: max={}, allocated={}, free={}, free threshold={}", + maxConfiguredMemory, allocatedMemory, freeMemory, lowMemoryThreshold); + } + } else { + // If we were running low on memory, but are not any longer, than log at warn + // so that it shows up in the logs + if (runningLowOnMemory) { + LOG.warn("Recovered from low memory condition"); } else { - // If we were running low on memory, but are not any longer, than log at warn - // so that it shows up in the logs - if (runningLowOnMemory) { - log.warn("Recovered from low memory condition"); - } else { - log.trace("Not running low on memory"); - } - runningLowOnMemory = false; + LOG.trace("Not running low on memory"); } + runningLowOnMemory = false; + lowMemCount = 0; } - if (mem != lastMemorySize) { + if (freeMemory != lastMemorySize) { sawChange = true; } - String sign = "+"; - if (mem - lastMemorySize <= 0) { - sign = ""; - } - - sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), - rt.totalMemory())); + sb.append(String.format(" freemem=%,d(%+,d) totalmem=%,d", freeMemory, + (freeMemory - lastMemorySize), rt.totalMemory())); if (sawChange) { - log.debug(sb.toString()); + LOG.debug(sb.toString()); } final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) { final long diff = now - lastMemoryCheckTime; if (diff > keepAliveTimeout + 1000) { - log.warn(String.format( + LOG.warn(String.format( "GC pause checker not called in a timely" + " fashion. Expected every %.1f seconds but was %.1f seconds since last check", keepAliveTimeout / 1000., diff / 1000.)); @@ -186,10 +187,10 @@ public class LowMemoryDetector { } if (maxIncreaseInCollectionTime > keepAliveTimeout) { - Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1); + Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1); } - lastMemorySize = mem; + lastMemorySize = freeMemory; lastMemoryCheckTime = now; } finally { memCheckTimeLock.unlock(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java index 2aa86e9ddd..95e7e2e7a4 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java @@ -20,31 +20,25 @@ package org.apache.accumulo.test.functional; import static java.util.concurrent.TimeUnit.SECONDS; -import java.io.IOException; -import java.util.Map; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public class MemoryFreeingIterator extends WrappingIterator { - @Override - public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, - IteratorEnvironment env) throws IOException { - super.init(source, options, env); + private static final Logger LOG = LoggerFactory.getLogger(MemoryFreeingIterator.class); + + @SuppressFBWarnings(value = "DM_GC", justification = "gc is okay for test") + public MemoryFreeingIterator() throws InterruptedException { + LOG.info("Try to free consumed memory - will block until isRunningLowOnMemory returns false."); MemoryConsumingIterator.freeBuffers(); while (this.isRunningLowOnMemory()) { + System.gc(); // wait for LowMemoryDetector to recognize the memory is free. - try { - Thread.sleep(SECONDS.toMillis(1)); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IOException("wait for low memory detector interrupted", ex); - } + Thread.sleep(SECONDS.toMillis(1)); } + LOG.info("isRunningLowOnMemory returned false - memory available"); } - } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java index e4ded303e6..8c9d0f7db3 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.functional; +import static org.apache.accumulo.test.util.Wait.waitFor; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -148,12 +149,9 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase { ReadWriteIT.ingest(client, 100, 100, 100, 0, table); compactionThread.start(); - while (paused <= 0) { - Thread.sleep(1000); - paused = MAJC_PAUSED.intValue(); - } + assertTrue(waitFor(() -> MAJC_PAUSED.intValue() > 0)); - MemoryStarvedScanIT.freeServerMemory(client, table); + MemoryStarvedScanIT.freeServerMemory(client); compactionThread.interrupt(); compactionThread.join(); assertNull(error.get()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java index 6418d98ef4..0a001047cc 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java @@ -153,7 +153,7 @@ public class MemoryStarvedMinCIT extends SharedMiniClusterBase { paused = MINC_PAUSED.intValue(); } - MemoryStarvedScanIT.freeServerMemory(client, table); + MemoryStarvedScanIT.freeServerMemory(client); ingestThread.interrupt(); ingestThread.join(); assertNull(error.get()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java index c0036d358d..607a59ece1 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test.functional; import static org.apache.accumulo.core.metrics.MetricsProducer.METRICS_LOW_MEMORY; +import static org.apache.accumulo.test.util.Wait.waitFor; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -40,6 +41,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.WrappingIterator; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -54,6 +56,8 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MemoryStarvedScanIT extends SharedMiniClusterBase { @@ -83,6 +87,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { public static final double FREE_MEMORY_THRESHOLD = 0.20D; + private static final Logger LOG = LoggerFactory.getLogger(MemoryStarvedScanIT.class); private static final DoubleAdder SCAN_START_DELAYED = new DoubleAdder(); private static final DoubleAdder SCAN_RETURNED_EARLY = new DoubleAdder(); private static final AtomicInteger LOW_MEM_DETECTED = new AtomicInteger(0); @@ -132,11 +137,16 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { } @BeforeEach - public void beforeEach() { + public void beforeEach() throws Exception { + // Free the server side memory + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + freeServerMemory(client); + } + // allow metric collection to cycle and metric value to reset to zero + waitFor(() -> 0 == LOW_MEM_DETECTED.get()); // Reset the client side counters SCAN_START_DELAYED.reset(); SCAN_START_DELAYED.reset(); - LOW_MEM_DETECTED.set(0); } static void consumeServerMemory(Scanner scanner) { @@ -162,13 +172,11 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { assertTrue(iter.hasNext()); } - static void freeServerMemory(AccumuloClient client, String table) throws Exception { - try (Scanner scanner = client.createScanner(table)) { - scanner.addScanIterator(new IteratorSetting(11, MemoryFreeingIterator.class, Map.of())); - @SuppressWarnings("unused") - Iterator<Entry<Key,Value>> iter = scanner.iterator(); // init'ing the iterator should be - // enough to free the memory - } + static void freeServerMemory(AccumuloClient client) throws Exception { + // Instantiating this class on the TabletServer will free the memory as it + // frees the buffers created by the MemoryConsumingIterator in its constructor. + client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(), + WrappingIterator.class.getName()); } @Test @@ -183,19 +191,16 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { ReadWriteIT.ingest(client, 10, 10, 10, 0, table); try (Scanner scanner = client.createScanner(table)) { - double returned = SCAN_RETURNED_EARLY.doubleValue(); - double paused = SCAN_START_DELAYED.doubleValue(); + final double returned = SCAN_RETURNED_EARLY.doubleValue(); + final double paused = SCAN_START_DELAYED.doubleValue(); consumeServerMemory(scanner); - // Wait for longer than the memory check interval - Thread.sleep(6_000); + // Wait for The metric that indicates a scan was returned early due to low memory + waitFor(() -> SCAN_RETURNED_EARLY.doubleValue() > returned + && SCAN_START_DELAYED.doubleValue() >= paused); - // The metric that indicates a scan was returned early due to low memory should - // have been incremented. - assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned); - assertTrue(SCAN_START_DELAYED.doubleValue() >= paused); - freeServerMemory(client, table); + freeServerMemory(client); } finally { to.delete(table); } @@ -236,6 +241,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE); t.start(); + LOG.info("Waiting for memory to be consumed"); // Wait until the dataConsumingScanner has started fetching data int currentCount = fetched.get(); @@ -252,30 +258,32 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { // Confirm that some data was fetched by the memoryConsumingScanner currentCount = fetched.get(); assertTrue(currentCount > 0 && currentCount < 100); + LOG.info("Memory consumed"); // Grab the current metric counts, wait - double returned = SCAN_RETURNED_EARLY.doubleValue(); - double paused = SCAN_START_DELAYED.doubleValue(); + final double returned = SCAN_RETURNED_EARLY.doubleValue(); + final double paused = SCAN_START_DELAYED.doubleValue(); Thread.sleep(1500); // One of two conditions could exist here: // The number of fetched rows equals the current count before the wait above // and the SCAN_START_DELAYED has been incremented OR the number of fetched // rows is one more than the current count and the SCAN_RETURNED_EARLY has // been incremented. - assertTrue((currentCount == fetched.get() && SCAN_START_DELAYED.doubleValue() > paused) - || (currentCount + 1 == fetched.get() && SCAN_RETURNED_EARLY.doubleValue() > returned)); + final int currentCountCopy = currentCount; + waitFor( + () -> (currentCountCopy == fetched.get() && SCAN_START_DELAYED.doubleValue() > paused) + || (currentCountCopy + 1 == fetched.get() + && SCAN_RETURNED_EARLY.doubleValue() > returned)); currentCount = fetched.get(); // Perform the check again - paused = SCAN_START_DELAYED.doubleValue(); - returned = SCAN_RETURNED_EARLY.doubleValue(); - Thread.sleep(1500); + assertTrue(waitFor(() -> 1 == LOW_MEM_DETECTED.get())); assertEquals(currentCount, fetched.get()); - assertEquals(1, LOW_MEM_DETECTED.get()); - // Free the memory which will allow the pausing scanner to continue - freeServerMemory(client, table); + LOG.info("Freeing memory"); + freeServerMemory(client); + LOG.info("Memory freed"); t.join(); assertEquals(30, fetched.get()); @@ -301,20 +309,18 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { try (BatchScanner scanner = client.createBatchScanner(table, client.securityOperations().getUserAuthorizations(client.whoami()), 1)) { - double returned = SCAN_RETURNED_EARLY.doubleValue(); - double paused = SCAN_START_DELAYED.doubleValue(); + + final double returned = SCAN_RETURNED_EARLY.doubleValue(); + final double paused = SCAN_START_DELAYED.doubleValue(); consumeServerMemory(scanner); - // Wait for longer than the memory check interval - Thread.sleep(6000); + // Wait for metric that indicates a scan was returned early due to low memory + assertTrue(waitFor(() -> SCAN_RETURNED_EARLY.doubleValue() > returned + && SCAN_START_DELAYED.doubleValue() >= paused)); + assertTrue(waitFor(() -> 1 == LOW_MEM_DETECTED.get())); - // The metric that indicates a scan was returned early due to low memory should - // have been incremented. - assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned); - assertTrue(SCAN_START_DELAYED.doubleValue() >= paused); - assertEquals(1, LOW_MEM_DETECTED.get()); - freeServerMemory(client, table); + freeServerMemory(client); } finally { to.delete(table); } @@ -324,6 +330,96 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { @Test public void testBatchScanPauses() throws Exception { + String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + TableOperations to = client.tableOperations(); + to.create(table); + + // check memory okay before starting + assertEquals(0, LOW_MEM_DETECTED.get()); + + ReadWriteIT.ingest(client, 10, 3, 10, 0, table); + + try (BatchScanner dataConsumingScanner = client.createBatchScanner(table); + Scanner memoryConsumingScanner = client.createScanner(table)) { + + dataConsumingScanner.addScanIterator( + new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", "500"))); + dataConsumingScanner.setRanges(Collections.singletonList(new Range())); + Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator(); + AtomicInteger fetched = new AtomicInteger(0); + Thread t = new Thread(() -> { + int i = 0; + while (iter.hasNext()) { + iter.next(); + fetched.set(++i); + } + }); + + memoryConsumingScanner + .addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of())); + memoryConsumingScanner.setBatchSize(1); + memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE); + + t.start(); + + // Wait until the dataConsumingScanner has started fetching data + int currentCount = fetched.get(); + while (currentCount == 0) { + Thread.sleep(500); + currentCount = fetched.get(); + } + + // This should block until the GarbageCollectionLogger runs and notices that the + // VM is low on memory. + Iterator<Entry<Key,Value>> consumingIter = memoryConsumingScanner.iterator(); + assertTrue(consumingIter.hasNext()); + + // Confirm that some data was fetched by the dataConsumingScanner + currentCount = fetched.get(); + assertTrue(currentCount > 0 && currentCount < 100); + + // Grab the current paused count, wait two seconds and then confirm that + // the number of rows fetched by the memoryConsumingScanner has not increased + // and that the scan delay counter has increased. + final double returned = SCAN_RETURNED_EARLY.doubleValue(); + final double paused = SCAN_START_DELAYED.doubleValue(); + + final int currentCountCopy = currentCount; + waitFor( + () -> (currentCountCopy == fetched.get() && SCAN_START_DELAYED.doubleValue() > paused) + || (currentCountCopy + 1 == fetched.get() + && SCAN_RETURNED_EARLY.doubleValue() > returned)); + waitFor(() -> 1 == LOW_MEM_DETECTED.get()); + + // Perform the check again + final double paused2 = SCAN_START_DELAYED.doubleValue(); + final double returned2 = SCAN_RETURNED_EARLY.doubleValue(); + Thread.sleep(1500); + assertEquals(currentCount, fetched.get()); + assertTrue(SCAN_START_DELAYED.doubleValue() >= paused2); + assertEquals(returned2, SCAN_RETURNED_EARLY.doubleValue()); + waitFor(() -> 1 == LOW_MEM_DETECTED.get()); + + // Free the memory which will allow the pausing scanner to continue + freeServerMemory(client); + + t.join(); + assertEquals(30, fetched.get()); + + } finally { + to.delete(table); + } + } + } + + /** + * Check that the low memory condition is set and remains set until free memory is available. + */ + @Test + public void testLowMemoryFlapping() throws Exception { + String table = getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { @@ -383,7 +479,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { assertEquals(currentCount, fetched.get()); assertTrue(SCAN_START_DELAYED.doubleValue() >= paused); assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned); - assertEquals(1, LOW_MEM_DETECTED.get()); + assertTrue(waitFor(() -> LOW_MEM_DETECTED.get() == 1)); // Perform the check again paused = SCAN_START_DELAYED.doubleValue(); @@ -392,21 +488,27 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { assertEquals(currentCount, fetched.get()); assertTrue(SCAN_START_DELAYED.doubleValue() >= paused); assertEquals(returned, SCAN_RETURNED_EARLY.doubleValue()); - assertEquals(1, LOW_MEM_DETECTED.get()); + // check across multiple low memory checks and metric updates that low memory detected + // remains set + int checkCount = 0; + while (checkCount++ < 5) { + Thread.sleep(5_000); + LOG.debug("Check low memory still set. Low Memory Flag: {}, Check count: {}", + LOW_MEM_DETECTED.get(), checkCount); + assertEquals(1, LOW_MEM_DETECTED.get()); + } // Free the memory which will allow the pausing scanner to continue - freeServerMemory(client, table); + freeServerMemory(client); t.join(); assertEquals(30, fetched.get()); - // allow metic collection to cycle. - Thread.sleep(6_000); - assertEquals(0, LOW_MEM_DETECTED.get()); + // allow metric collection to cycle. + assertTrue(waitFor(() -> LOW_MEM_DETECTED.get() == 0)); } finally { to.delete(table); } } } - }