This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 187733f6342ec19e13c82d570b1b6a4e9e2c6945 Merge: 5cbef68a05 18cdc28690 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Mon Feb 10 20:39:31 2025 -0500 Merge branch '2.1' into 3.1 .../core/client/admin/compaction/CompactionSelector.java | 2 +- .../src/main/java/org/apache/accumulo/core/data/LoadPlan.java | 2 ++ .../org/apache/accumulo/core/fate/zookeeper/ZooCache.java | 11 +---------- .../org/apache/accumulo/server/util/ServiceStatusCmd.java | 1 - .../apache/accumulo/server/util/checkCommand/CheckRunner.java | 3 ++- .../apache/accumulo/test/functional/GracefulShutdownIT.java | 2 ++ .../apache/accumulo/test/functional/MemoryStarvedScanIT.java | 3 ++- 7 files changed, 10 insertions(+), 14 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionSelector.java index 713a451dc0,d54f612c16..64327ace21 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionSelector.java @@@ -36,8 -35,7 +36,8 @@@ import org.apache.accumulo.core.iterato /** * This class selects which files a user compaction will compact. It can also be configured per - * table to periodically select files to compact. + * table to periodically select files to compact, although per table functionality is deprecated. - * See {@link org.apache.accumulo.core.spi.compaction.CompactionKind#SELECTOR} for details. ++ * See {@link org.apache.accumulo.core.spi.compaction.CompactionKind} for details. * * @since 2.1.0 */ diff --cc core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index a490a03cfd,d7f924f576..0e81ac7a08 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@@ -227,21 -220,21 +227,12 @@@ public class ZooCache * Runs an operation against ZooKeeper. Retries are performed by the retry method when * KeeperExceptions occur. * -- * Changes were made in ACCUMULO-4388 so that the run method no longer accepts Zookeeper as an -- * argument, and instead relies on the ZooRunnable implementation to call -- * {@link #getZooKeeper()}. Performing the call to retrieving a ZooKeeper Session after caches -- * are checked has the benefit of limiting ZK connections and blocking as a result of obtaining -- * these sessions. -- * * @return T the result of the runnable */ abstract T run() throws KeeperException, InterruptedException; /** -- * Retry will attempt to call the run method. Run should make a call to {@link #getZooKeeper()} -- * after checks to cached information are made. This change, per ACCUMULO-4388 ensures that we -- * don't create a ZooKeeper session when information is cached, and access to ZooKeeper is -- * unnecessary. ++ * Retry will attempt to call the run method. * * @return result of the runnable access success ( i.e. no exceptions ). */ diff --cc server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/CheckRunner.java index d1a1ababb4,0000000000..d816fb675b mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/CheckRunner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/CheckRunner.java @@@ -1,61 -1,0 +1,62 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.util.checkCommand; + +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.cli.ServerUtilOpts; +import org.apache.accumulo.server.util.Admin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface CheckRunner { + Logger log = LoggerFactory.getLogger(CheckRunner.class); + + /** + * Runs the check + * + * @param context server context + * @param opts server util opts. Only applicable for the checks on the root and metadata tables + * @param fixFiles remove dangling file pointers. Only applicable for the checks on the system and + * user files - * @return the {@link Admin.CheckCommand.CheckStatus} resulting from running the check ++ * @return the {@link org.apache.accumulo.server.util.Admin.CheckCommand.CheckStatus} resulting ++ * from running the check + */ + Admin.CheckCommand.CheckStatus runCheck(ServerContext context, ServerUtilOpts opts, + boolean fixFiles) throws Exception; + + /** + * + * @return the check that this check runner runs + */ + Admin.CheckCommand.Check getCheck(); + + default void printRunning() { + String running = "Running check " + getCheck(); + log.trace("-".repeat(running.length())); + log.trace(running); + log.trace("-".repeat(running.length())); + } + + default void printCompleted(Admin.CheckCommand.CheckStatus status) { + String completed = "Check " + getCheck() + " completed with status " + status; + log.trace("-".repeat(completed.length())); + log.trace(completed); + log.trace("-".repeat(completed.length())); + } +} diff --cc test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java index c8e4eec16f,ad7aef847d..2afba2090e --- a/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java @@@ -18,7 -18,9 +18,8 @@@ */ package org.apache.accumulo.test.functional; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; diff --cc test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java index 1fc57667d8,0000000000..67b82ce393 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java @@@ -1,547 -1,0 +1,548 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.metrics.Metric.LOW_MEMORY; +import static org.apache.accumulo.core.metrics.Metric.SCAN_PAUSED_FOR_MEM; +import static org.apache.accumulo.core.metrics.Metric.SCAN_RETURN_FOR_MEM; +import static org.apache.accumulo.test.util.Wait.waitFor; +import static org.junit.jupiter.api.Assertions.assertEquals; ++import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.DoubleAdder; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.minicluster.MemoryUnit; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; +import org.apache.accumulo.test.metrics.TestStatsDSink; +import org.apache.accumulo.test.metrics.TestStatsDSink.Metric; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; + +public class MemoryStarvedScanIT extends SharedMiniClusterBase { + + public static class MemoryStarvedITConfiguration implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setNumTservers(1); + cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE); + // Configure the LowMemoryDetector in the TabletServer + cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL, "5s"); + cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD, + Double.toString(FREE_MEMORY_THRESHOLD)); + cfg.setProperty(Property.GENERAL_LOW_MEM_SCAN_PROTECTION, "true"); + // Tell the server processes to use a StatsDMeterRegistry that will be configured + // to push all metrics to the sink we started. + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty("general.custom.metrics.opts.logging.step", "5s"); + String clazzList = LoggingMeterRegistryFactory.class.getName() + "," + + TestStatsDRegistryFactory.class.getName(); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList); + Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); + } + } + + public static final double FREE_MEMORY_THRESHOLD = 0.40D; + + private static final Logger LOG = LoggerFactory.getLogger(MemoryStarvedScanIT.class); + private static final DoubleAdder SCAN_START_DELAYED = new DoubleAdder(); + private static final DoubleAdder SCAN_RETURNED_EARLY = new DoubleAdder(); + private static final AtomicInteger LOW_MEM_DETECTED = new AtomicInteger(0); + private static TestStatsDSink sink; + private static Thread metricConsumer; + + @BeforeAll + public static void start() throws Exception { + sink = new TestStatsDSink(); + metricConsumer = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + List<String> statsDMetrics = sink.getLines(); + for (String line : statsDMetrics) { + if (Thread.currentThread().isInterrupted()) { + break; + } + if (line.startsWith("accumulo")) { + Metric metric = TestStatsDSink.parseStatsDMetric(line); + if (SCAN_PAUSED_FOR_MEM.getName().equals(metric.getName())) { + double val = Double.parseDouble(metric.getValue()); + SCAN_START_DELAYED.add(val); + } else if (SCAN_RETURN_FOR_MEM.getName().equals(metric.getName())) { + double val = Double.parseDouble(metric.getValue()); + SCAN_RETURNED_EARLY.add(val); + } else if (metric.getName().equals(LOW_MEMORY.getName())) { + String process = metric.getTags().get("process.name"); + if (process != null && process.contains("tserver")) { + int val = Integer.parseInt(metric.getValue()); + LOW_MEM_DETECTED.set(val); + } + } + } + } + } + }); + metricConsumer.start(); + + SharedMiniClusterBase.startMiniClusterWithConfig(new MemoryStarvedITConfiguration()); + } + + @AfterAll + public static void stop() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + sink.close(); + metricConsumer.interrupt(); + metricConsumer.join(); + } + + @BeforeEach + public void beforeEach() throws Exception { + // Free the server side memory + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + freeServerMemory(client); + } + // allow metric collection to cycle and metric value to reset to zero + waitFor(() -> 0 == LOW_MEM_DETECTED.get()); + // Reset the client side counters + SCAN_START_DELAYED.reset(); + SCAN_START_DELAYED.reset(); + } + + static void consumeServerMemory(Scanner scanner) { + // This iterator will attempt to consume all free memory in the TabletServer + scanner.addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of())); + scanner.setBatchSize(1); + // Set the ReadaheadThreshold to a large number so that another background thread + // that performs read-ahead of KV pairs is not started. + scanner.setReadaheadThreshold(Long.MAX_VALUE); + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + // This should block until the LowMemoryDetector runs and notices that the + // VM is low on memory. + assertTrue(iter.hasNext()); + } + + private void consumeServerMemory(BatchScanner scanner) { + // This iterator will attempt to consume all free memory in the TabletServer + scanner.addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of())); + scanner.setRanges(Collections.singletonList(new Range())); + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + // This should block until the LowMemoryDetector runs and notices that the + // VM is low on memory. + assertTrue(iter.hasNext()); + } + + static void freeServerMemory(AccumuloClient client) throws Exception { + // Scan the metadata table as this is not prevented when the + // server is low on memory. Use the MemoryFreeingIterator as it + // will free the memory on init() + try (Scanner scanner = client.createScanner(AccumuloTable.METADATA.tableName())) { + IteratorSetting is = new IteratorSetting(11, MemoryFreeingIterator.class, Map.of()); + scanner.addScanIterator(is); - var unused = Iterables.size(scanner); // consume the key/values ++ assertNotEquals(0, Iterables.size(scanner)); // consume the key/values + } + } + + @Test + public void testScanReturnsEarlyDueToLowMemory() throws Exception { + + String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + TableOperations to = client.tableOperations(); + to.create(table); + + ReadWriteIT.ingest(client, 10, 10, 10, 0, table); + + try (Scanner scanner = client.createScanner(table)) { + final double returned = SCAN_RETURNED_EARLY.doubleValue(); + final double paused = SCAN_START_DELAYED.doubleValue(); + + consumeServerMemory(scanner); + + // Wait for The metric that indicates a scan was returned early due to low memory + waitFor(() -> SCAN_RETURNED_EARLY.doubleValue() > returned + && SCAN_START_DELAYED.doubleValue() >= paused); + + freeServerMemory(client); + } finally { + to.delete(table); + } + } + } + + @Test + public void testScanPauses() throws Exception { + + String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + TableOperations to = client.tableOperations(); + to.create(table); + + ReadWriteIT.ingest(client, 10, 3, 10, 0, table); + + try (Scanner dataConsumingScanner = client.createScanner(table); + Scanner memoryConsumingScanner = client.createScanner(table)) { + + dataConsumingScanner.addScanIterator( + new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", "2000"))); + dataConsumingScanner.setBatchSize(1); + dataConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE); + Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator(); + AtomicInteger fetched = new AtomicInteger(0); + Thread t = new Thread(() -> { + while (iter.hasNext()) { + iter.next(); + fetched.incrementAndGet(); + } + }); + + memoryConsumingScanner + .addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of())); + memoryConsumingScanner.setBatchSize(1); + memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE); + + t.start(); + LOG.info("Waiting for memory to be consumed"); + + // Wait until the dataConsumingScanner has started fetching data + int currentCount = fetched.get(); + while (currentCount == 0) { + Thread.sleep(500); + currentCount = fetched.get(); + } + + // This should block until the LowMemoryDetector runs and notices that the + // VM is low on memory. + Iterator<Entry<Key,Value>> consumingIter = memoryConsumingScanner.iterator(); + assertTrue(consumingIter.hasNext()); + + // Confirm that some data was fetched by the memoryConsumingScanner + currentCount = fetched.get(); + assertTrue(currentCount > 0 && currentCount < 100); + LOG.info("Memory consumed after reading {} rows", currentCount); + + // Grab the current metric counts, wait + final double returned = SCAN_RETURNED_EARLY.doubleValue(); + final double paused = SCAN_START_DELAYED.doubleValue(); + Thread.sleep(1500); + // One of two conditions could exist here: + // The number of fetched rows equals the current count before the wait above + // and the SCAN_START_DELAYED has been incremented OR the number of fetched + // rows is one more than the current count and the SCAN_RETURNED_EARLY has + // been incremented. + final int currentCountCopy = currentCount; + waitFor( + () -> (currentCountCopy == fetched.get() && SCAN_START_DELAYED.doubleValue() > paused) + || (currentCountCopy + 1 == fetched.get() + && SCAN_RETURNED_EARLY.doubleValue() > returned)); + currentCount = fetched.get(); + + // Perform the check again + waitFor(() -> 1 == LOW_MEM_DETECTED.get()); + assertEquals(currentCount, fetched.get()); + + // Free the memory which will allow the pausing scanner to continue + LOG.info("Freeing memory"); + freeServerMemory(client); + LOG.info("Memory freed"); + + t.join(); + assertEquals(30, fetched.get()); + } finally { + to.delete(table); + } + } + } + + @Test + public void testBatchScanReturnsEarlyDueToLowMemory() throws Exception { + + String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + TableOperations to = client.tableOperations(); + to.create(table); + + // check memory okay before starting + assertEquals(0, LOW_MEM_DETECTED.get()); + + ReadWriteIT.ingest(client, 10, 10, 10, 0, table); + + try (BatchScanner scanner = client.createBatchScanner(table, + client.securityOperations().getUserAuthorizations(client.whoami()), 1)) { + + final double returned = SCAN_RETURNED_EARLY.doubleValue(); + final double paused = SCAN_START_DELAYED.doubleValue(); + + consumeServerMemory(scanner); + + // Wait for metric that indicates a scan was returned early due to low memory + waitFor(() -> SCAN_RETURNED_EARLY.doubleValue() > returned + && SCAN_START_DELAYED.doubleValue() >= paused); + waitFor(() -> 1 == LOW_MEM_DETECTED.get()); + + } finally { + freeServerMemory(client); + to.delete(table); + } + } + } + + @Test + public void testBatchScanPauses() throws Exception { + + String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + TableOperations to = client.tableOperations(); + to.create(table); + + // check memory okay before starting + assertEquals(0, LOW_MEM_DETECTED.get()); + + // generate enough data so more than one batch is returned. + ReadWriteIT.ingest(client, 1000, 3, 10, 0, table); + to.flush(table, null, null, true); + + try (BatchScanner dataConsumingScanner = client.createBatchScanner(table); + Scanner memoryConsumingScanner = client.createScanner(table)) { + + // add enough delay that batch does not return all rows and we get more than one batch + dataConsumingScanner.addScanIterator( + new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", "50"))); + dataConsumingScanner.setRanges(Collections.singletonList(new Range())); + Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator(); + AtomicInteger fetched = new AtomicInteger(0); + Thread t = new Thread(() -> { + while (iter.hasNext()) { + iter.next(); + fetched.incrementAndGet(); + } + }); + + memoryConsumingScanner + .addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of())); + memoryConsumingScanner.setBatchSize(1); + memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE); + + t.start(); + + // Wait for a batch to be returned + waitFor(() -> fetched.get() > 0, MINUTES.toMillis(5), 200); + + // Make sure memory is free before trying to consume memory + while (LOW_MEM_DETECTED.get() == 1) { + freeServerMemory(client); + Thread.sleep(5000); + } + + // This should block until the LowMemoryDetector runs and notices that the + // VM is low on memory. + Iterator<Entry<Key,Value>> consumingIter = memoryConsumingScanner.iterator(); + assertTrue(consumingIter.hasNext()); + waitFor(() -> 1 == LOW_MEM_DETECTED.get()); + + // Grab the current paused count, the number of rows fetched by the memoryConsumingScanner + // has not increased + // and that the scan delay counter has increased. + final double returned = SCAN_RETURNED_EARLY.doubleValue(); + final double paused = SCAN_START_DELAYED.doubleValue(); + + // Confirm that some data was fetched by the dataConsumingScanner + int currentCount = fetched.get(); + LOG.info("rows read in first batch: {}", currentCount); + // check some, but not all rows have been read + assertTrue(currentCount > 0 && currentCount < 3000); + + final int startCount = currentCount; + waitFor(() -> verifyBatchedStalled(fetched.get(), startCount, paused, returned), + MINUTES.toMillis(2), SECONDS.toMillis(2)); + waitFor(() -> 1 == LOW_MEM_DETECTED.get()); + + // Perform the check again - checking that rows fetched not advancing + final double paused2 = SCAN_START_DELAYED.doubleValue(); + final double returned2 = SCAN_RETURNED_EARLY.doubleValue(); + Thread.sleep(1500); + assertEquals(startCount, fetched.get()); + assertTrue(SCAN_START_DELAYED.doubleValue() >= paused2); + assertEquals(returned2, SCAN_RETURNED_EARLY.doubleValue()); + waitFor(() -> 1 == LOW_MEM_DETECTED.get()); + + // Free the memory which will allow the pausing scanner to continue + freeServerMemory(client); + + waitFor(() -> 0 == LOW_MEM_DETECTED.get()); + + t.join(); + // check that remain rows have been read + assertEquals(3000, fetched.get()); + + } finally { + to.delete(table); + } + } + } + + private boolean verifyBatchedStalled(final int currCount, final int startCount, + final double paused, final double returned) { + if (startCount <= currCount && SCAN_START_DELAYED.doubleValue() > paused) { + LOG.debug("found expected pause because of low memory"); + return true; + } + if (startCount <= currCount && SCAN_RETURNED_EARLY.doubleValue() > returned) { + LOG.debug("found expected early return because of low memory"); + return true; + } + LOG.info( + "waiting for low memory pause. prev count: {}, curr count: {}, paused: {}, returned: {}", + startCount, currCount, SCAN_START_DELAYED.doubleValue(), SCAN_RETURNED_EARLY.doubleValue()); + return false; + } + + /** + * Check that the low memory condition is set and remains set until free memory is available. + */ + @Test + public void testLowMemoryFlapping() throws Exception { + + String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + TableOperations to = client.tableOperations(); + to.create(table); + + // check memory okay before starting + assertEquals(0, LOW_MEM_DETECTED.get()); + + ReadWriteIT.ingest(client, 10, 3, 10, 0, table); + + try (BatchScanner dataConsumingScanner = client.createBatchScanner(table); + Scanner memoryConsumingScanner = client.createScanner(table)) { + + dataConsumingScanner.addScanIterator( + new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", "500"))); + dataConsumingScanner.setRanges(Collections.singletonList(new Range())); + Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator(); + AtomicInteger fetched = new AtomicInteger(0); + Thread t = new Thread(() -> { + int i = 0; + while (iter.hasNext()) { + iter.next(); + fetched.set(++i); + } + }); + + memoryConsumingScanner + .addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of())); + memoryConsumingScanner.setBatchSize(1); + memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE); + + t.start(); + + // Wait until the dataConsumingScanner has started fetching data + int currentCount = fetched.get(); + while (currentCount == 0) { + Thread.sleep(500); + currentCount = fetched.get(); + } + + // This should block until the LowMemoryDetector runs and notices that the + // VM is low on memory. + Iterator<Entry<Key,Value>> consumingIter = memoryConsumingScanner.iterator(); + assertTrue(consumingIter.hasNext()); + + // Confirm that some data was fetched by the dataConsumingScanner + currentCount = fetched.get(); + assertTrue(currentCount > 0 && currentCount < 100); + + // Grab the current paused count, wait two seconds and then confirm that + // the number of rows fetched by the memoryConsumingScanner has not increased + // and that the scan delay counter has increased. + double returned = SCAN_RETURNED_EARLY.doubleValue(); + double paused = SCAN_START_DELAYED.doubleValue(); + Thread.sleep(1500); + assertEquals(currentCount, fetched.get()); + assertTrue(SCAN_START_DELAYED.doubleValue() >= paused); + assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned); + waitFor(() -> LOW_MEM_DETECTED.get() == 1); + + // Perform the check again + paused = SCAN_START_DELAYED.doubleValue(); + returned = SCAN_RETURNED_EARLY.doubleValue(); + Thread.sleep(1500); + assertEquals(currentCount, fetched.get()); + assertTrue(SCAN_START_DELAYED.doubleValue() >= paused); + assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned); + + // check across multiple low memory checks and metric updates that low memory detected + // remains set + int checkCount = 0; + while (checkCount++ < 5) { + Thread.sleep(5_000); + LOG.debug("Check low memory still set. Low Memory Flag: {}, Check count: {}", + LOW_MEM_DETECTED.get(), checkCount); + assertEquals(1, LOW_MEM_DETECTED.get()); + } + // Free the memory which will allow the pausing scanner to continue + freeServerMemory(client); + + t.join(); + assertEquals(30, fetched.get()); + // allow metric collection to cycle. + waitFor(() -> LOW_MEM_DETECTED.get() == 0); + + } finally { + to.delete(table); + } + } + } +}