This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 187733f6342ec19e13c82d570b1b6a4e9e2c6945
Merge: 5cbef68a05 18cdc28690
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Mon Feb 10 20:39:31 2025 -0500

    Merge branch '2.1' into 3.1

 .../core/client/admin/compaction/CompactionSelector.java      |  2 +-
 .../src/main/java/org/apache/accumulo/core/data/LoadPlan.java |  2 ++
 .../org/apache/accumulo/core/fate/zookeeper/ZooCache.java     | 11 +----------
 .../org/apache/accumulo/server/util/ServiceStatusCmd.java     |  1 -
 .../apache/accumulo/server/util/checkCommand/CheckRunner.java |  3 ++-
 .../apache/accumulo/test/functional/GracefulShutdownIT.java   |  2 ++
 .../apache/accumulo/test/functional/MemoryStarvedScanIT.java  |  3 ++-
 7 files changed, 10 insertions(+), 14 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionSelector.java
index 713a451dc0,d54f612c16..64327ace21
--- 
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionSelector.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionSelector.java
@@@ -36,8 -35,7 +36,8 @@@ import org.apache.accumulo.core.iterato
  
  /**
   * This class selects which files a user compaction will compact. It can also 
be configured per
 - * table to periodically select files to compact.
 + * table to periodically select files to compact, although per table 
functionality is deprecated.
-  * See {@link 
org.apache.accumulo.core.spi.compaction.CompactionKind#SELECTOR} for details.
++ * See {@link org.apache.accumulo.core.spi.compaction.CompactionKind} for 
details.
   *
   * @since 2.1.0
   */
diff --cc 
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java
index a490a03cfd,d7f924f576..0e81ac7a08
--- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java
@@@ -227,21 -220,21 +227,12 @@@ public class ZooCache 
       * Runs an operation against ZooKeeper. Retries are performed by the 
retry method when
       * KeeperExceptions occur.
       *
--     * Changes were made in ACCUMULO-4388 so that the run method no longer 
accepts Zookeeper as an
--     * argument, and instead relies on the ZooRunnable implementation to call
--     * {@link #getZooKeeper()}. Performing the call to retrieving a ZooKeeper 
Session after caches
--     * are checked has the benefit of limiting ZK connections and blocking as 
a result of obtaining
--     * these sessions.
--     *
       * @return T the result of the runnable
       */
      abstract T run() throws KeeperException, InterruptedException;
  
      /**
--     * Retry will attempt to call the run method. Run should make a call to 
{@link #getZooKeeper()}
--     * after checks to cached information are made. This change, per 
ACCUMULO-4388 ensures that we
--     * don't create a ZooKeeper session when information is cached, and 
access to ZooKeeper is
--     * unnecessary.
++     * Retry will attempt to call the run method.
       *
       * @return result of the runnable access success ( i.e. no exceptions ).
       */
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/CheckRunner.java
index d1a1ababb4,0000000000..d816fb675b
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/CheckRunner.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/CheckRunner.java
@@@ -1,61 -1,0 +1,62 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.util.checkCommand;
 +
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.cli.ServerUtilOpts;
 +import org.apache.accumulo.server.util.Admin;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public interface CheckRunner {
 +  Logger log = LoggerFactory.getLogger(CheckRunner.class);
 +
 +  /**
 +   * Runs the check
 +   *
 +   * @param context server context
 +   * @param opts server util opts. Only applicable for the checks on the root 
and metadata tables
 +   * @param fixFiles remove dangling file pointers. Only applicable for the 
checks on the system and
 +   *        user files
-    * @return the {@link Admin.CheckCommand.CheckStatus} resulting from 
running the check
++   * @return the {@link 
org.apache.accumulo.server.util.Admin.CheckCommand.CheckStatus} resulting
++   *         from running the check
 +   */
 +  Admin.CheckCommand.CheckStatus runCheck(ServerContext context, 
ServerUtilOpts opts,
 +      boolean fixFiles) throws Exception;
 +
 +  /**
 +   *
 +   * @return the check that this check runner runs
 +   */
 +  Admin.CheckCommand.Check getCheck();
 +
 +  default void printRunning() {
 +    String running = "Running check " + getCheck();
 +    log.trace("-".repeat(running.length()));
 +    log.trace(running);
 +    log.trace("-".repeat(running.length()));
 +  }
 +
 +  default void printCompleted(Admin.CheckCommand.CheckStatus status) {
 +    String completed = "Check " + getCheck() + " completed with status " + 
status;
 +    log.trace("-".repeat(completed.length()));
 +    log.trace(completed);
 +    log.trace("-".repeat(completed.length()));
 +  }
 +}
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java
index c8e4eec16f,ad7aef847d..2afba2090e
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java
@@@ -18,7 -18,9 +18,8 @@@
   */
  package org.apache.accumulo.test.functional;
  
 -import static java.nio.charset.StandardCharsets.UTF_8;
  import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertNotNull;
  import static org.junit.jupiter.api.Assertions.assertTrue;
  
  import java.util.List;
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index 1fc57667d8,0000000000..67b82ce393
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@@ -1,547 -1,0 +1,548 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static org.apache.accumulo.core.metrics.Metric.LOW_MEMORY;
 +import static org.apache.accumulo.core.metrics.Metric.SCAN_PAUSED_FOR_MEM;
 +import static org.apache.accumulo.core.metrics.Metric.SCAN_RETURN_FOR_MEM;
 +import static org.apache.accumulo.test.util.Wait.waitFor;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
++import static org.junit.jupiter.api.Assertions.assertNotEquals;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.DoubleAdder;
 +
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.metadata.AccumuloTable;
 +import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory;
 +import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 +import org.apache.accumulo.harness.SharedMiniClusterBase;
 +import org.apache.accumulo.minicluster.MemoryUnit;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
 +import org.apache.accumulo.test.metrics.TestStatsDSink;
 +import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
 +import org.apache.hadoop.conf.Configuration;
 +import org.junit.jupiter.api.AfterAll;
 +import org.junit.jupiter.api.BeforeAll;
 +import org.junit.jupiter.api.BeforeEach;
 +import org.junit.jupiter.api.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterables;
 +
 +public class MemoryStarvedScanIT extends SharedMiniClusterBase {
 +
 +  public static class MemoryStarvedITConfiguration implements 
MiniClusterConfigurationCallback {
 +
 +    @Override
 +    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, 
Configuration coreSite) {
 +      cfg.setNumTservers(1);
 +      cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE);
 +      // Configure the LowMemoryDetector in the TabletServer
 +      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL, "5s");
 +      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD,
 +          Double.toString(FREE_MEMORY_THRESHOLD));
 +      cfg.setProperty(Property.GENERAL_LOW_MEM_SCAN_PROTECTION, "true");
 +      // Tell the server processes to use a StatsDMeterRegistry that will be 
configured
 +      // to push all metrics to the sink we started.
 +      cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
 +      cfg.setProperty("general.custom.metrics.opts.logging.step", "5s");
 +      String clazzList = LoggingMeterRegistryFactory.class.getName() + ","
 +          + TestStatsDRegistryFactory.class.getName();
 +      cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList);
 +      Map<String,String> sysProps = 
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
 +          TestStatsDRegistryFactory.SERVER_PORT, 
Integer.toString(sink.getPort()));
 +      cfg.setSystemProperties(sysProps);
 +    }
 +  }
 +
 +  public static final double FREE_MEMORY_THRESHOLD = 0.40D;
 +
 +  private static final Logger LOG = 
LoggerFactory.getLogger(MemoryStarvedScanIT.class);
 +  private static final DoubleAdder SCAN_START_DELAYED = new DoubleAdder();
 +  private static final DoubleAdder SCAN_RETURNED_EARLY = new DoubleAdder();
 +  private static final AtomicInteger LOW_MEM_DETECTED = new AtomicInteger(0);
 +  private static TestStatsDSink sink;
 +  private static Thread metricConsumer;
 +
 +  @BeforeAll
 +  public static void start() throws Exception {
 +    sink = new TestStatsDSink();
 +    metricConsumer = new Thread(() -> {
 +      while (!Thread.currentThread().isInterrupted()) {
 +        List<String> statsDMetrics = sink.getLines();
 +        for (String line : statsDMetrics) {
 +          if (Thread.currentThread().isInterrupted()) {
 +            break;
 +          }
 +          if (line.startsWith("accumulo")) {
 +            Metric metric = TestStatsDSink.parseStatsDMetric(line);
 +            if (SCAN_PAUSED_FOR_MEM.getName().equals(metric.getName())) {
 +              double val = Double.parseDouble(metric.getValue());
 +              SCAN_START_DELAYED.add(val);
 +            } else if 
(SCAN_RETURN_FOR_MEM.getName().equals(metric.getName())) {
 +              double val = Double.parseDouble(metric.getValue());
 +              SCAN_RETURNED_EARLY.add(val);
 +            } else if (metric.getName().equals(LOW_MEMORY.getName())) {
 +              String process = metric.getTags().get("process.name");
 +              if (process != null && process.contains("tserver")) {
 +                int val = Integer.parseInt(metric.getValue());
 +                LOW_MEM_DETECTED.set(val);
 +              }
 +            }
 +          }
 +        }
 +      }
 +    });
 +    metricConsumer.start();
 +
 +    SharedMiniClusterBase.startMiniClusterWithConfig(new 
MemoryStarvedITConfiguration());
 +  }
 +
 +  @AfterAll
 +  public static void stop() throws Exception {
 +    SharedMiniClusterBase.stopMiniCluster();
 +    sink.close();
 +    metricConsumer.interrupt();
 +    metricConsumer.join();
 +  }
 +
 +  @BeforeEach
 +  public void beforeEach() throws Exception {
 +    // Free the server side memory
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      freeServerMemory(client);
 +    }
 +    // allow metric collection to cycle and metric value to reset to zero
 +    waitFor(() -> 0 == LOW_MEM_DETECTED.get());
 +    // Reset the client side counters
 +    SCAN_START_DELAYED.reset();
 +    SCAN_START_DELAYED.reset();
 +  }
 +
 +  static void consumeServerMemory(Scanner scanner) {
 +    // This iterator will attempt to consume all free memory in the 
TabletServer
 +    scanner.addScanIterator(new IteratorSetting(11, 
MemoryConsumingIterator.class, Map.of()));
 +    scanner.setBatchSize(1);
 +    // Set the ReadaheadThreshold to a large number so that another 
background thread
 +    // that performs read-ahead of KV pairs is not started.
 +    scanner.setReadaheadThreshold(Long.MAX_VALUE);
 +    Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +    // This should block until the LowMemoryDetector runs and notices that the
 +    // VM is low on memory.
 +    assertTrue(iter.hasNext());
 +  }
 +
 +  private void consumeServerMemory(BatchScanner scanner) {
 +    // This iterator will attempt to consume all free memory in the 
TabletServer
 +    scanner.addScanIterator(new IteratorSetting(11, 
MemoryConsumingIterator.class, Map.of()));
 +    scanner.setRanges(Collections.singletonList(new Range()));
 +    Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +    // This should block until the LowMemoryDetector runs and notices that the
 +    // VM is low on memory.
 +    assertTrue(iter.hasNext());
 +  }
 +
 +  static void freeServerMemory(AccumuloClient client) throws Exception {
 +    // Scan the metadata table as this is not prevented when the
 +    // server is low on memory. Use the MemoryFreeingIterator as it
 +    // will free the memory on init()
 +    try (Scanner scanner = 
client.createScanner(AccumuloTable.METADATA.tableName())) {
 +      IteratorSetting is = new IteratorSetting(11, 
MemoryFreeingIterator.class, Map.of());
 +      scanner.addScanIterator(is);
-       var unused = Iterables.size(scanner); // consume the key/values
++      assertNotEquals(0, Iterables.size(scanner)); // consume the key/values
 +    }
 +  }
 +
 +  @Test
 +  public void testScanReturnsEarlyDueToLowMemory() throws Exception {
 +
 +    String table = getUniqueNames(1)[0];
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      TableOperations to = client.tableOperations();
 +      to.create(table);
 +
 +      ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
 +
 +      try (Scanner scanner = client.createScanner(table)) {
 +        final double returned = SCAN_RETURNED_EARLY.doubleValue();
 +        final double paused = SCAN_START_DELAYED.doubleValue();
 +
 +        consumeServerMemory(scanner);
 +
 +        // Wait for The metric that indicates a scan was returned early due 
to low memory
 +        waitFor(() -> SCAN_RETURNED_EARLY.doubleValue() > returned
 +            && SCAN_START_DELAYED.doubleValue() >= paused);
 +
 +        freeServerMemory(client);
 +      } finally {
 +        to.delete(table);
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testScanPauses() throws Exception {
 +
 +    String table = getUniqueNames(1)[0];
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      TableOperations to = client.tableOperations();
 +      to.create(table);
 +
 +      ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
 +
 +      try (Scanner dataConsumingScanner = client.createScanner(table);
 +          Scanner memoryConsumingScanner = client.createScanner(table)) {
 +
 +        dataConsumingScanner.addScanIterator(
 +            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", 
"2000")));
 +        dataConsumingScanner.setBatchSize(1);
 +        dataConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
 +        Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
 +        AtomicInteger fetched = new AtomicInteger(0);
 +        Thread t = new Thread(() -> {
 +          while (iter.hasNext()) {
 +            iter.next();
 +            fetched.incrementAndGet();
 +          }
 +        });
 +
 +        memoryConsumingScanner
 +            .addScanIterator(new IteratorSetting(11, 
MemoryConsumingIterator.class, Map.of()));
 +        memoryConsumingScanner.setBatchSize(1);
 +        memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
 +
 +        t.start();
 +        LOG.info("Waiting for memory to be consumed");
 +
 +        // Wait until the dataConsumingScanner has started fetching data
 +        int currentCount = fetched.get();
 +        while (currentCount == 0) {
 +          Thread.sleep(500);
 +          currentCount = fetched.get();
 +        }
 +
 +        // This should block until the LowMemoryDetector runs and notices 
that the
 +        // VM is low on memory.
 +        Iterator<Entry<Key,Value>> consumingIter = 
memoryConsumingScanner.iterator();
 +        assertTrue(consumingIter.hasNext());
 +
 +        // Confirm that some data was fetched by the memoryConsumingScanner
 +        currentCount = fetched.get();
 +        assertTrue(currentCount > 0 && currentCount < 100);
 +        LOG.info("Memory consumed after reading {} rows", currentCount);
 +
 +        // Grab the current metric counts, wait
 +        final double returned = SCAN_RETURNED_EARLY.doubleValue();
 +        final double paused = SCAN_START_DELAYED.doubleValue();
 +        Thread.sleep(1500);
 +        // One of two conditions could exist here:
 +        // The number of fetched rows equals the current count before the 
wait above
 +        // and the SCAN_START_DELAYED has been incremented OR the number of 
fetched
 +        // rows is one more than the current count and the 
SCAN_RETURNED_EARLY has
 +        // been incremented.
 +        final int currentCountCopy = currentCount;
 +        waitFor(
 +            () -> (currentCountCopy == fetched.get() && 
SCAN_START_DELAYED.doubleValue() > paused)
 +                || (currentCountCopy + 1 == fetched.get()
 +                    && SCAN_RETURNED_EARLY.doubleValue() > returned));
 +        currentCount = fetched.get();
 +
 +        // Perform the check again
 +        waitFor(() -> 1 == LOW_MEM_DETECTED.get());
 +        assertEquals(currentCount, fetched.get());
 +
 +        // Free the memory which will allow the pausing scanner to continue
 +        LOG.info("Freeing memory");
 +        freeServerMemory(client);
 +        LOG.info("Memory freed");
 +
 +        t.join();
 +        assertEquals(30, fetched.get());
 +      } finally {
 +        to.delete(table);
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testBatchScanReturnsEarlyDueToLowMemory() throws Exception {
 +
 +    String table = getUniqueNames(1)[0];
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      TableOperations to = client.tableOperations();
 +      to.create(table);
 +
 +      // check memory okay before starting
 +      assertEquals(0, LOW_MEM_DETECTED.get());
 +
 +      ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
 +
 +      try (BatchScanner scanner = client.createBatchScanner(table,
 +          client.securityOperations().getUserAuthorizations(client.whoami()), 
1)) {
 +
 +        final double returned = SCAN_RETURNED_EARLY.doubleValue();
 +        final double paused = SCAN_START_DELAYED.doubleValue();
 +
 +        consumeServerMemory(scanner);
 +
 +        // Wait for metric that indicates a scan was returned early due to 
low memory
 +        waitFor(() -> SCAN_RETURNED_EARLY.doubleValue() > returned
 +            && SCAN_START_DELAYED.doubleValue() >= paused);
 +        waitFor(() -> 1 == LOW_MEM_DETECTED.get());
 +
 +      } finally {
 +        freeServerMemory(client);
 +        to.delete(table);
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testBatchScanPauses() throws Exception {
 +
 +    String table = getUniqueNames(1)[0];
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      TableOperations to = client.tableOperations();
 +      to.create(table);
 +
 +      // check memory okay before starting
 +      assertEquals(0, LOW_MEM_DETECTED.get());
 +
 +      // generate enough data so more than one batch is returned.
 +      ReadWriteIT.ingest(client, 1000, 3, 10, 0, table);
 +      to.flush(table, null, null, true);
 +
 +      try (BatchScanner dataConsumingScanner = 
client.createBatchScanner(table);
 +          Scanner memoryConsumingScanner = client.createScanner(table)) {
 +
 +        // add enough delay that batch does not return all rows and we get 
more than one batch
 +        dataConsumingScanner.addScanIterator(
 +            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", 
"50")));
 +        dataConsumingScanner.setRanges(Collections.singletonList(new 
Range()));
 +        Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
 +        AtomicInteger fetched = new AtomicInteger(0);
 +        Thread t = new Thread(() -> {
 +          while (iter.hasNext()) {
 +            iter.next();
 +            fetched.incrementAndGet();
 +          }
 +        });
 +
 +        memoryConsumingScanner
 +            .addScanIterator(new IteratorSetting(11, 
MemoryConsumingIterator.class, Map.of()));
 +        memoryConsumingScanner.setBatchSize(1);
 +        memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
 +
 +        t.start();
 +
 +        // Wait for a batch to be returned
 +        waitFor(() -> fetched.get() > 0, MINUTES.toMillis(5), 200);
 +
 +        // Make sure memory is free before trying to consume memory
 +        while (LOW_MEM_DETECTED.get() == 1) {
 +          freeServerMemory(client);
 +          Thread.sleep(5000);
 +        }
 +
 +        // This should block until the LowMemoryDetector runs and notices 
that the
 +        // VM is low on memory.
 +        Iterator<Entry<Key,Value>> consumingIter = 
memoryConsumingScanner.iterator();
 +        assertTrue(consumingIter.hasNext());
 +        waitFor(() -> 1 == LOW_MEM_DETECTED.get());
 +
 +        // Grab the current paused count, the number of rows fetched by the 
memoryConsumingScanner
 +        // has not increased
 +        // and that the scan delay counter has increased.
 +        final double returned = SCAN_RETURNED_EARLY.doubleValue();
 +        final double paused = SCAN_START_DELAYED.doubleValue();
 +
 +        // Confirm that some data was fetched by the dataConsumingScanner
 +        int currentCount = fetched.get();
 +        LOG.info("rows read in first batch: {}", currentCount);
 +        // check some, but not all rows have been read
 +        assertTrue(currentCount > 0 && currentCount < 3000);
 +
 +        final int startCount = currentCount;
 +        waitFor(() -> verifyBatchedStalled(fetched.get(), startCount, paused, 
returned),
 +            MINUTES.toMillis(2), SECONDS.toMillis(2));
 +        waitFor(() -> 1 == LOW_MEM_DETECTED.get());
 +
 +        // Perform the check again - checking that rows fetched not advancing
 +        final double paused2 = SCAN_START_DELAYED.doubleValue();
 +        final double returned2 = SCAN_RETURNED_EARLY.doubleValue();
 +        Thread.sleep(1500);
 +        assertEquals(startCount, fetched.get());
 +        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused2);
 +        assertEquals(returned2, SCAN_RETURNED_EARLY.doubleValue());
 +        waitFor(() -> 1 == LOW_MEM_DETECTED.get());
 +
 +        // Free the memory which will allow the pausing scanner to continue
 +        freeServerMemory(client);
 +
 +        waitFor(() -> 0 == LOW_MEM_DETECTED.get());
 +
 +        t.join();
 +        // check that remain rows have been read
 +        assertEquals(3000, fetched.get());
 +
 +      } finally {
 +        to.delete(table);
 +      }
 +    }
 +  }
 +
 +  private boolean verifyBatchedStalled(final int currCount, final int 
startCount,
 +      final double paused, final double returned) {
 +    if (startCount <= currCount && SCAN_START_DELAYED.doubleValue() > paused) 
{
 +      LOG.debug("found expected pause because of low memory");
 +      return true;
 +    }
 +    if (startCount <= currCount && SCAN_RETURNED_EARLY.doubleValue() > 
returned) {
 +      LOG.debug("found expected early return because of low memory");
 +      return true;
 +    }
 +    LOG.info(
 +        "waiting for low memory pause. prev count: {}, curr count: {}, 
paused: {}, returned: {}",
 +        startCount, currCount, SCAN_START_DELAYED.doubleValue(), 
SCAN_RETURNED_EARLY.doubleValue());
 +    return false;
 +  }
 +
 +  /**
 +   * Check that the low memory condition is set and remains set until free 
memory is available.
 +   */
 +  @Test
 +  public void testLowMemoryFlapping() throws Exception {
 +
 +    String table = getUniqueNames(1)[0];
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      TableOperations to = client.tableOperations();
 +      to.create(table);
 +
 +      // check memory okay before starting
 +      assertEquals(0, LOW_MEM_DETECTED.get());
 +
 +      ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
 +
 +      try (BatchScanner dataConsumingScanner = 
client.createBatchScanner(table);
 +          Scanner memoryConsumingScanner = client.createScanner(table)) {
 +
 +        dataConsumingScanner.addScanIterator(
 +            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", 
"500")));
 +        dataConsumingScanner.setRanges(Collections.singletonList(new 
Range()));
 +        Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
 +        AtomicInteger fetched = new AtomicInteger(0);
 +        Thread t = new Thread(() -> {
 +          int i = 0;
 +          while (iter.hasNext()) {
 +            iter.next();
 +            fetched.set(++i);
 +          }
 +        });
 +
 +        memoryConsumingScanner
 +            .addScanIterator(new IteratorSetting(11, 
MemoryConsumingIterator.class, Map.of()));
 +        memoryConsumingScanner.setBatchSize(1);
 +        memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
 +
 +        t.start();
 +
 +        // Wait until the dataConsumingScanner has started fetching data
 +        int currentCount = fetched.get();
 +        while (currentCount == 0) {
 +          Thread.sleep(500);
 +          currentCount = fetched.get();
 +        }
 +
 +        // This should block until the LowMemoryDetector runs and notices 
that the
 +        // VM is low on memory.
 +        Iterator<Entry<Key,Value>> consumingIter = 
memoryConsumingScanner.iterator();
 +        assertTrue(consumingIter.hasNext());
 +
 +        // Confirm that some data was fetched by the dataConsumingScanner
 +        currentCount = fetched.get();
 +        assertTrue(currentCount > 0 && currentCount < 100);
 +
 +        // Grab the current paused count, wait two seconds and then confirm 
that
 +        // the number of rows fetched by the memoryConsumingScanner has not 
increased
 +        // and that the scan delay counter has increased.
 +        double returned = SCAN_RETURNED_EARLY.doubleValue();
 +        double paused = SCAN_START_DELAYED.doubleValue();
 +        Thread.sleep(1500);
 +        assertEquals(currentCount, fetched.get());
 +        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
 +        assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned);
 +        waitFor(() -> LOW_MEM_DETECTED.get() == 1);
 +
 +        // Perform the check again
 +        paused = SCAN_START_DELAYED.doubleValue();
 +        returned = SCAN_RETURNED_EARLY.doubleValue();
 +        Thread.sleep(1500);
 +        assertEquals(currentCount, fetched.get());
 +        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
 +        assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned);
 +
 +        // check across multiple low memory checks and metric updates that 
low memory detected
 +        // remains set
 +        int checkCount = 0;
 +        while (checkCount++ < 5) {
 +          Thread.sleep(5_000);
 +          LOG.debug("Check low memory still set. Low Memory Flag: {}, Check 
count: {}",
 +              LOW_MEM_DETECTED.get(), checkCount);
 +          assertEquals(1, LOW_MEM_DETECTED.get());
 +        }
 +        // Free the memory which will allow the pausing scanner to continue
 +        freeServerMemory(client);
 +
 +        t.join();
 +        assertEquals(30, fetched.get());
 +        // allow metric collection to cycle.
 +        waitFor(() -> LOW_MEM_DETECTED.get() == 0);
 +
 +      } finally {
 +        to.delete(table);
 +      }
 +    }
 +  }
 +}

Reply via email to