This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d5e593373 Rename GarbageCollectionLogger, optionally pause 
scans/compactions until memory usage is lower (#3161)
6d5e593373 is described below

commit 6d5e5933734f74bb4a3c607a34df32fd93cb8916
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Feb 16 13:28:32 2023 -0500

    Rename GarbageCollectionLogger, optionally pause scans/compactions until 
memory usage is lower (#3161)
    
    Renamed the GarbageCollectionLogger to LowMemoryDetector and added
    properties to configure its interval and memory threshold. The 
LowMemoryDetector
    is passive and will log when free memory is low (default less than 5%). 
Three
    new properties were added that will influence how scan, minc, and majc 
behave
    when the LowMemoryDetector determines that the server is running low on 
memory.
    Scans will either pause at the start of retrieving the next batch of results
    or return the current batch of results early and minor/major compactions 
will
    pause until the low memory condition no longer exists. Server-side user
    iterators can check for low memory conditions by subclassing 
WrappingIterator
    and calling isRunningLowOnMemory(). Four new metrics where added to indicate
    when scans were paused or returned early, or when minor or major compactions
    were paused.
---
 .../core/client/admin/ActiveCompaction.java        |   6 +
 .../core/clientImpl/ActiveCompactionImpl.java      |   5 +
 .../org/apache/accumulo/core/conf/Property.java    |  24 +-
 .../core/iterators/IteratorEnvironment.java        |  10 +
 .../core/iterators/SortedKeyValueIterator.java     |  13 +
 .../accumulo/core/iterators/WrappingIterator.java  |  11 +-
 .../accumulo/core/metrics/MetricsProducer.java     |  32 ++
 .../core/tabletserver/thrift/ActiveCompaction.java | 104 +++++-
 core/src/main/thrift/tabletserver.thrift           |   1 +
 .../org/apache/accumulo/server/AbstractServer.java |   9 +
 .../org/apache/accumulo/server/ServerContext.java  |   7 +
 .../accumulo/server/compaction/CompactionInfo.java |   8 +-
 .../server/compaction/CompactionStats.java         |  13 +-
 .../accumulo/server/compaction/FileCompactor.java  |  45 ++-
 ...tionStats.java => PausedCompactionMetrics.java} |  47 +--
 .../iterators/SystemIteratorEnvironment.java       |   5 +
 .../LowMemoryDetector.java}                        |  83 ++++-
 .../coordinator/CompactionCoordinator.java         |  11 -
 .../coordinator/CompactionCoordinatorTest.java     |   3 -
 .../org/apache/accumulo/compactor/Compactor.java   |  30 +-
 .../apache/accumulo/compactor/CompactorTest.java   |   8 +-
 .../org/apache/accumulo/tserver/ScanServer.java    |  18 +-
 .../accumulo/tserver/TabletClientHandler.java      |   4 +-
 .../accumulo/tserver/TabletHostingServer.java      |   6 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  29 +-
 .../tserver/compactions/CompactionManager.java     |   1 +
 .../tserver/metrics/TabletServerScanMetrics.java   |  16 +
 .../accumulo/tserver/tablet/CompactableUtils.java  |   2 +-
 .../accumulo/tserver/tablet/MinorCompactor.java    |   4 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   5 +
 .../apache/accumulo/tserver/tablet/TabletBase.java |  60 +++-
 .../test/functional/MemoryConsumingIterator.java   |  91 +++++
 .../test/functional/MemoryFreeingIterator.java     |  23 +-
 .../test/functional/MemoryStarvedMajCIT.java       | 167 +++++++++
 .../test/functional/MemoryStarvedMinCIT.java       | 168 +++++++++
 .../test/functional/MemoryStarvedScanIT.java       | 388 +++++++++++++++++++++
 .../apache/accumulo/test/metrics/MetricsIT.java    |   3 +-
 37 files changed, 1328 insertions(+), 132 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java
 
b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java
index 28965f7267..3c804fac41 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java
@@ -119,6 +119,12 @@ public abstract class ActiveCompaction {
    */
   public abstract long getEntriesWritten();
 
+  /**
+   * @return the number of times the server paused a compaction
+   * @since 3.0.0
+   */
+  public abstract long getPausedCount();
+
   /**
    * @return the per compaction iterators configured
    */
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
index 46c0c1014c..ca36f3df7e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
@@ -102,6 +102,11 @@ public class ActiveCompactionImpl extends ActiveCompaction 
{
     return tac.getEntriesWritten();
   }
 
+  @Override
+  public long getPausedCount() {
+    return tac.getTimesPaused();
+  }
+
   @Override
   public List<IteratorSetting> getIterators() {
     ArrayList<IteratorSetting> ret = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 93fad5e98e..19d22d84e6 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -275,6 +275,29 @@ public enum Property {
   
GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL("general.delegation.token.update.interval",
 "1d",
       PropertyType.TIMEDURATION, "The length of time between generation of new 
secret keys",
       "1.7.0"),
+  GENERAL_LOW_MEM_DETECTOR_INTERVAL("general.low.mem.detector.interval", "5s",
+      PropertyType.TIMEDURATION, "The time interval between low memory 
checks", "3.0.0"),
+  GENERAL_LOW_MEM_DETECTOR_THRESHOLD("general.low.mem.detector.threshold", 
"0.05",
+      PropertyType.FRACTION,
+      "The LowMemoryDetector will report when free memory drops below this 
percentage of total memory",
+      "3.0.0"),
+  GENERAL_LOW_MEM_SCAN_PROTECTION("general.low.mem.protection.scan", "false", 
PropertyType.BOOLEAN,
+      "Scans may be paused or return results early when the server "
+          + "is low on memory and this property is set to true. Enabling this 
property will incur a slight "
+          + "scan performance penalty when the server is not low on memory",
+      "3.0.0"),
+  
GENERAL_LOW_MEM_MINC_PROTECTION("general.low.mem.protection.compaction.minc", 
"false",
+      PropertyType.BOOLEAN,
+      "Minor compactions may be paused when the server "
+          + "is low on memory and this property is set to true. Enabling this 
property will incur a slight "
+          + "compaction performance penalty when the server is not low on 
memory",
+      "3.0.0"),
+  
GENERAL_LOW_MEM_MAJC_PROTECTION("general.low.mem.protection.compaction.majc", 
"false",
+      PropertyType.BOOLEAN,
+      "Major compactions may be paused when the server "
+          + "is low on memory and this property is set to true. Enabling this 
property will incur a slight "
+          + "compaction performance penalty when the server is not low on 
memory",
+      "3.0.0"),
   GENERAL_MAX_SCANNER_RETRY_PERIOD("general.max.scanner.retry.period", "5s",
       PropertyType.TIMEDURATION,
       "The maximum amount of time that a Scanner should wait before retrying a 
failed RPC",
@@ -810,7 +833,6 @@ public enum Property {
           + " The resources that are used by default can be seen in"
           + " 
accumulo/server/monitor/src/main/resources/templates/default.ftl",
       "2.0.0"),
-
   // per table properties
   TABLE_PREFIX("table.", null, PropertyType.PREFIX,
       "Properties in this category affect tablet server treatment of tablets,"
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
index 372a0e49a3..9a78ddf35d 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
@@ -187,4 +187,14 @@ public interface IteratorEnvironment {
   default TableId getTableId() {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Return whether or not the server is running low on memory
+   *
+   * @return true if server is running low on memory
+   * @since 3.0.0
+   */
+  default boolean isRunningLowOnMemory() {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
index be9bfe0653..30efd5f07a 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
@@ -149,4 +149,17 @@ public interface SortedKeyValueIterator<K extends 
WritableComparable<?>,V extend
    * @exception UnsupportedOperationException if not supported.
    */
   SortedKeyValueIterator<K,V> deepCopy(IteratorEnvironment env);
+
+  /**
+   * Returns true when running in a server process and the 
GarbageCollectionLogger determines that
+   * the server is running low on memory. This is useful for iterators that 
aggregate KV pairs or
+   * perform long running operations that create a lot of garbage. Server side 
iterators can
+   * override this method and return the value of 
IteratorEnvironment.isRunningLowOnMemory.
+   *
+   * @return true if running in server process and server is running low on 
memory
+   * @since 3.0.0
+   */
+  default boolean isRunningLowOnMemory() {
+    return false;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java 
b/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
index eb84bdc2c1..f7edbb4ad4 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
@@ -43,6 +43,7 @@ import org.apache.accumulo.core.data.Value;
 public abstract class WrappingIterator implements 
SortedKeyValueIterator<Key,Value> {
 
   private SortedKeyValueIterator<Key,Value> source = null;
+  private IteratorEnvironment env = null;
   boolean seenSeek = false;
 
   protected void setSource(SortedKeyValueIterator<Key,Value> source) {
@@ -89,7 +90,7 @@ public abstract class WrappingIterator implements 
SortedKeyValueIterator<Key,Val
   public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
       IteratorEnvironment env) throws IOException {
     this.setSource(source);
-
+    this.env = env;
   }
 
   @Override
@@ -107,4 +108,12 @@ public abstract class WrappingIterator implements 
SortedKeyValueIterator<Key,Val
     seenSeek = true;
   }
 
+  @Override
+  public boolean isRunningLowOnMemory() {
+    if (env == null) {
+      return SortedKeyValueIterator.super.isRunningLowOnMemory();
+    }
+    return env.isRunningLowOnMemory();
+  }
+
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java 
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index ef5b444724..e0b1bd65a0 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -425,6 +425,20 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <td>Counter</td>
  * <td></td>
  * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@link #METRICS_SCAN_PAUSED_FOR_MEM}</td>
+ * <td>Counter</td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@link #METRICS_SCAN_RETURN_FOR_MEM}</td>
+ * <td>Counter</td>
+ * <td></td>
+ * </tr>
  * <!-- major compactions -->
  * <tr>
  * <td>{i|e}_{compactionServiceName}_{executor_name}_queued</td>
@@ -442,6 +456,13 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <td>The compaction service information is in a tag:
  * id={i|e}_{compactionServiceName}_{executor_name}</td>
  * </tr>
+ * <tr>
+ * <td></td>
+ * <td></td>
+ * <td>{@link #METRICS_MAJC_PAUSED}</td>
+ * <td>Counter</td>
+ * <td></td>
+ * </tr>
  * <!-- minor compactions -->
  * <tr>
  * <td>Queue</td>
@@ -457,6 +478,13 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <td>Timer</td>
  * <td></td>
  * </tr>
+ * <tr>
+ * <td></td>
+ * <td></td>
+ * <td>{@link #METRICS_MINC_PAUSED}</td>
+ * <td>Counter</td>
+ * <td></td>
+ * </tr>
  * <!-- Updates (ingest) -->
  * <tr>
  * <td>permissionErrors</td>
@@ -595,10 +623,12 @@ public interface MetricsProducer {
   String METRICS_MAJC_PREFIX = "accumulo.tserver.compactions.majc.";
   String METRICS_MAJC_QUEUED = METRICS_MAJC_PREFIX + "queued";
   String METRICS_MAJC_RUNNING = METRICS_MAJC_PREFIX + "running";
+  String METRICS_MAJC_PAUSED = METRICS_MAJC_PREFIX + "paused";
 
   String METRICS_MINC_PREFIX = "accumulo.tserver.compactions.minc.";
   String METRICS_MINC_QUEUED = METRICS_MINC_PREFIX + "queued";
   String METRICS_MINC_RUNNING = METRICS_MINC_PREFIX + "running";
+  String METRICS_MINC_PAUSED = METRICS_MINC_PREFIX + "paused";
 
   String METRICS_SCAN = "accumulo.tserver.scans";
   String METRICS_SCAN_OPEN_FILES = METRICS_SCAN + ".files.open";
@@ -608,6 +638,8 @@ public interface MetricsProducer {
   String METRICS_SCAN_CONTINUE = METRICS_SCAN + ".continue";
   String METRICS_SCAN_CLOSE = METRICS_SCAN + ".close";
   String METRICS_SCAN_BUSY_TIMEOUT = METRICS_SCAN + ".busy_timeout";
+  String METRICS_SCAN_PAUSED_FOR_MEM = METRICS_SCAN + ".paused.for.memory";
+  String METRICS_SCAN_RETURN_FOR_MEM = METRICS_SCAN + 
".return.early.for.memory";
 
   String METRICS_TSERVER_PREFIX = "accumulo.tserver.";
   String METRICS_TSERVER_ENTRIES = METRICS_TSERVER_PREFIX + "entries";
diff --git 
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/ActiveCompaction.java
 
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/ActiveCompaction.java
index dc534d2347..3c48fe14c0 100644
--- 
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/ActiveCompaction.java
+++ 
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/ActiveCompaction.java
@@ -39,6 +39,7 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
   private static final org.apache.thrift.protocol.TField 
ENTRIES_WRITTEN_FIELD_DESC = new 
org.apache.thrift.protocol.TField("entriesWritten", 
org.apache.thrift.protocol.TType.I64, (short)9);
   private static final org.apache.thrift.protocol.TField SSI_LIST_FIELD_DESC = 
new org.apache.thrift.protocol.TField("ssiList", 
org.apache.thrift.protocol.TType.LIST, (short)10);
   private static final org.apache.thrift.protocol.TField SSIO_FIELD_DESC = new 
org.apache.thrift.protocol.TField("ssio", org.apache.thrift.protocol.TType.MAP, 
(short)11);
+  private static final org.apache.thrift.protocol.TField 
TIMES_PAUSED_FIELD_DESC = new org.apache.thrift.protocol.TField("timesPaused", 
org.apache.thrift.protocol.TType.I64, (short)12);
 
   private static final org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY = new ActiveCompactionStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory 
TUPLE_SCHEME_FACTORY = new ActiveCompactionTupleSchemeFactory();
@@ -62,6 +63,7 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
   public long entriesWritten; // required
   public @org.apache.thrift.annotation.Nullable 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.IterInfo> ssiList; // 
required
   public @org.apache.thrift.annotation.Nullable 
java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
 ssio; // required
+  public long timesPaused; // required
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -83,7 +85,8 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
     ENTRIES_READ((short)8, "entriesRead"),
     ENTRIES_WRITTEN((short)9, "entriesWritten"),
     SSI_LIST((short)10, "ssiList"),
-    SSIO((short)11, "ssio");
+    SSIO((short)11, "ssio"),
+    TIMES_PAUSED((short)12, "timesPaused");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new 
java.util.HashMap<java.lang.String, _Fields>();
 
@@ -121,6 +124,8 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
           return SSI_LIST;
         case 11: // SSIO
           return SSIO;
+        case 12: // TIMES_PAUSED
+          return TIMES_PAUSED;
         default:
           return null;
       }
@@ -167,6 +172,7 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
   private static final int __AGE_ISSET_ID = 0;
   private static final int __ENTRIESREAD_ISSET_ID = 1;
   private static final int __ENTRIESWRITTEN_ISSET_ID = 2;
+  private static final int __TIMESPAUSED_ISSET_ID = 3;
   private byte __isset_bitfield = 0;
   public static final java.util.Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
@@ -199,6 +205,8 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
             new 
org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
                 new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
 
                 new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))));
+    tmpMap.put(_Fields.TIMES_PAUSED, new 
org.apache.thrift.meta_data.FieldMetaData("timesPaused", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ActiveCompaction.class,
 metaDataMap);
   }
@@ -217,7 +225,8 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
     long entriesRead,
     long entriesWritten,
     java.util.List<org.apache.accumulo.core.dataImpl.thrift.IterInfo> ssiList,
-    
java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
 ssio)
+    
java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
 ssio,
+    long timesPaused)
   {
     this();
     this.extent = extent;
@@ -234,6 +243,8 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
     setEntriesWrittenIsSet(true);
     this.ssiList = ssiList;
     this.ssio = ssio;
+    this.timesPaused = timesPaused;
+    setTimesPausedIsSet(true);
   }
 
   /**
@@ -285,6 +296,7 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
       }
       this.ssio = __this__ssio;
     }
+    this.timesPaused = other.timesPaused;
   }
 
   @Override
@@ -308,6 +320,8 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
     this.entriesWritten = 0;
     this.ssiList = null;
     this.ssio = null;
+    setTimesPausedIsSet(false);
+    this.timesPaused = 0;
   }
 
   @org.apache.thrift.annotation.Nullable
@@ -638,6 +652,29 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
     }
   }
 
+  public long getTimesPaused() {
+    return this.timesPaused;
+  }
+
+  public ActiveCompaction setTimesPaused(long timesPaused) {
+    this.timesPaused = timesPaused;
+    setTimesPausedIsSet(true);
+    return this;
+  }
+
+  public void unsetTimesPaused() {
+    __isset_bitfield = 
org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, 
__TIMESPAUSED_ISSET_ID);
+  }
+
+  /** Returns true if field timesPaused is set (has been assigned a value) and 
false otherwise */
+  public boolean isSetTimesPaused() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, 
__TIMESPAUSED_ISSET_ID);
+  }
+
+  public void setTimesPausedIsSet(boolean value) {
+    __isset_bitfield = 
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, 
__TIMESPAUSED_ISSET_ID, value);
+  }
+
   @Override
   public void setFieldValue(_Fields field, 
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
@@ -729,6 +766,14 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
       }
       break;
 
+    case TIMES_PAUSED:
+      if (value == null) {
+        unsetTimesPaused();
+      } else {
+        setTimesPaused((java.lang.Long)value);
+      }
+      break;
+
     }
   }
 
@@ -769,6 +814,9 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
     case SSIO:
       return getSsio();
 
+    case TIMES_PAUSED:
+      return getTimesPaused();
+
     }
     throw new java.lang.IllegalStateException();
   }
@@ -803,6 +851,8 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
       return isSetSsiList();
     case SSIO:
       return isSetSsio();
+    case TIMES_PAUSED:
+      return isSetTimesPaused();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -919,6 +969,15 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
         return false;
     }
 
+    boolean this_present_timesPaused = true;
+    boolean that_present_timesPaused = true;
+    if (this_present_timesPaused || that_present_timesPaused) {
+      if (!(this_present_timesPaused && that_present_timesPaused))
+        return false;
+      if (this.timesPaused != that.timesPaused)
+        return false;
+    }
+
     return true;
   }
 
@@ -964,6 +1023,8 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
     if (isSetSsio())
       hashCode = hashCode * 8191 + ssio.hashCode();
 
+    hashCode = hashCode * 8191 + 
org.apache.thrift.TBaseHelper.hashCode(timesPaused);
+
     return hashCode;
   }
 
@@ -1085,6 +1146,16 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.compare(isSetTimesPaused(), 
other.isSetTimesPaused());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTimesPaused()) {
+      lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.timesPaused, other.timesPaused);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1184,6 +1255,10 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
       sb.append(this.ssio);
     }
     first = false;
+    if (!first) sb.append(", ");
+    sb.append("timesPaused:");
+    sb.append(this.timesPaused);
+    first = false;
     sb.append(")");
     return sb.toString();
   }
@@ -1368,6 +1443,14 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
+          case 12: // TIMES_PAUSED
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.timesPaused = iprot.readI64();
+              struct.setTimesPausedIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
         }
@@ -1463,6 +1546,9 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
         }
         oprot.writeFieldEnd();
       }
+      oprot.writeFieldBegin(TIMES_PAUSED_FIELD_DESC);
+      oprot.writeI64(struct.timesPaused);
+      oprot.writeFieldEnd();
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1515,7 +1601,10 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
       if (struct.isSetSsio()) {
         optionals.set(10);
       }
-      oprot.writeBitSet(optionals, 11);
+      if (struct.isSetTimesPaused()) {
+        optionals.set(11);
+      }
+      oprot.writeBitSet(optionals, 12);
       if (struct.isSetExtent()) {
         struct.extent.write(oprot);
       }
@@ -1575,12 +1664,15 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
           }
         }
       }
+      if (struct.isSetTimesPaused()) {
+        oprot.writeI64(struct.timesPaused);
+      }
     }
 
     @Override
     public void read(org.apache.thrift.protocol.TProtocol prot, 
ActiveCompaction struct) throws org.apache.thrift.TException {
       org.apache.thrift.protocol.TTupleProtocol iprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
-      java.util.BitSet incoming = iprot.readBitSet(11);
+      java.util.BitSet incoming = iprot.readBitSet(12);
       if (incoming.get(0)) {
         struct.extent = new 
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent();
         struct.extent.read(iprot);
@@ -1667,6 +1759,10 @@ public class ActiveCompaction implements 
org.apache.thrift.TBase<ActiveCompactio
         }
         struct.setSsioIsSet(true);
       }
+      if (incoming.get(11)) {
+        struct.timesPaused = iprot.readI64();
+        struct.setTimesPausedIsSet(true);
+      }
     }
   }
 
diff --git a/core/src/main/thrift/tabletserver.thrift 
b/core/src/main/thrift/tabletserver.thrift
index 445211ad61..0098a9ca93 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -83,6 +83,7 @@ struct ActiveCompaction {
   9:i64 entriesWritten
   10:list<data.IterInfo> ssiList
   11:map<string, map<string, string>> ssio
+  12:i64 timesPaused
 }
 
 struct TIteratorSetting {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java 
b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index 9d1d4c4a0d..6b47fd5fea 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.server;
 
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.Constants;
@@ -27,6 +29,8 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.metrics.MetricsUtil;
 import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.mem.LowMemoryDetector;
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +59,11 @@ public abstract class AbstractServer implements 
AutoCloseable, Runnable {
       // Server-side "client" check to make sure we're logged in as a user we 
expect to be
       context.enforceKerberosLogin();
     }
+    final LowMemoryDetector lmd = context.getLowMemoryDetector();
+    ScheduledFuture<?> future = 
context.getScheduledExecutor().scheduleWithFixedDelay(
+        () -> lmd.logGCInfo(context.getConfiguration()), 0,
+        lmd.getIntervalMillis(context.getConfiguration()), 
TimeUnit.MILLISECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
   /**
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java 
b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index e2286115c2..90cb171b88 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -66,6 +66,7 @@ import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.conf.store.PropStore;
 import org.apache.accumulo.server.conf.store.impl.ZooPropStore;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.mem.LowMemoryDetector;
 import org.apache.accumulo.server.metadata.ServerAmpleImpl;
 import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
 import org.apache.accumulo.server.rpc.ThriftServerType;
@@ -102,6 +103,7 @@ public class ServerContext extends ClientContext {
   private final Supplier<ScheduledThreadPoolExecutor> 
sharedScheduledThreadPool;
   private final Supplier<AuditedSecurityOperation> securityOperation;
   private final Supplier<CryptoServiceFactory> cryptoFactorySupplier;
+  private final Supplier<LowMemoryDetector> lowMemoryDetector;
 
   public ServerContext(SiteConfiguration siteConfig) {
     this(new ServerInfo(siteConfig));
@@ -127,6 +129,7 @@ public class ServerContext extends ClientContext {
     securityOperation =
         memoize(() -> new AuditedSecurityOperation(this, 
SecurityOperation.getAuthorizor(this),
             SecurityOperation.getAuthenticator(this), 
SecurityOperation.getPermHandler(this)));
+    lowMemoryDetector = memoize(() -> new LowMemoryDetector());
   }
 
   /**
@@ -453,4 +456,8 @@ public class ServerContext extends ClientContext {
     return zkUserPath.get();
   }
 
+  public LowMemoryDetector getLowMemoryDetector() {
+    return lowMemoryDetector.get();
+  }
+
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
index 500653964d..05b8fa2b72 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
@@ -38,12 +38,14 @@ public class CompactionInfo {
   private final String localityGroup;
   private final long entriesRead;
   private final long entriesWritten;
+  private final long timesPaused;
   private final TCompactionReason reason;
 
   CompactionInfo(FileCompactor compactor) {
     this.localityGroup = compactor.getCurrentLocalityGroup();
     this.entriesRead = compactor.getEntriesRead();
     this.entriesWritten = compactor.getEntriesWritten();
+    this.timesPaused = compactor.getTimesPaused();
     this.reason = compactor.getReason();
     this.compactor = compactor;
   }
@@ -64,6 +66,10 @@ public class CompactionInfo {
     return entriesWritten;
   }
 
+  public long getTimesPaused() {
+    return timesPaused;
+  }
+
   public Thread getThread() {
     return compactor.thread;
   }
@@ -100,6 +106,6 @@ public class CompactionInfo {
         .collect(Collectors.toList());
     return new ActiveCompaction(compactor.extent.toThrift(),
         System.currentTimeMillis() - compactor.getStartTime(), files, 
compactor.getOutputFile(),
-        type, reason, localityGroup, entriesRead, entriesWritten, iiList, 
iterOptions);
+        type, reason, localityGroup, entriesRead, entriesWritten, iiList, 
iterOptions, timesPaused);
   }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
index e3ccfdd798..9704726d2f 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
@@ -22,10 +22,12 @@ public class CompactionStats {
   private long entriesRead;
   private long entriesWritten;
   private long fileSize;
+  private int timesPaused;
 
-  public CompactionStats(long er, long ew) {
+  public CompactionStats(long er, long ew, int tp) {
     this.setEntriesRead(er);
     this.setEntriesWritten(ew);
+    this.setTimesPaused(tp);
   }
 
   public CompactionStats() {}
@@ -46,9 +48,18 @@ public class CompactionStats {
     return entriesWritten;
   }
 
+  public long getTimesPaused() {
+    return timesPaused;
+  }
+
+  public void setTimesPaused(int timesPaused) {
+    this.timesPaused = timesPaused;
+  }
+
   public void add(CompactionStats mcs) {
     this.entriesRead += mcs.entriesRead;
     this.entriesWritten += mcs.entriesWritten;
+    this.timesPaused += mcs.timesPaused;
   }
 
   public void setFileSize(long fileSize) {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 27eef8def0..f8ba0011c9 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -67,6 +68,7 @@ import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
+import org.apache.accumulo.server.mem.LowMemoryDetector.DetectionScope;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReportingIterator;
 import org.apache.accumulo.server.problems.ProblemReports;
@@ -113,6 +115,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
   protected final KeyExtent extent;
   private final List<IteratorSetting> iterators;
   private final CryptoService cryptoService;
+  private final PausedCompactionMetrics metrics;
 
   // things to report
   private String currentLocalityGroup = "";
@@ -120,6 +123,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
 
   private final AtomicLong entriesRead = new AtomicLong(0);
   private final AtomicLong entriesWritten = new AtomicLong(0);
+  private final AtomicInteger timesPaused = new AtomicInteger(0);
   private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd 
HH:mm:ss.SSS");
 
   // a unique id to identify a compactor
@@ -142,6 +146,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
   private void clearStats() {
     entriesRead.set(0);
     entriesWritten.set(0);
+    timesPaused.set(0);
   }
 
   protected static final Set<FileCompactor> runningCompactions =
@@ -162,7 +167,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
   public FileCompactor(ServerContext context, KeyExtent extent,
       Map<StoredTabletFile,DataFileValue> files, TabletFile outputFile, 
boolean propagateDeletes,
       CompactionEnv env, List<IteratorSetting> iterators, 
AccumuloConfiguration tableConfiguation,
-      CryptoService cs) {
+      CryptoService cs, PausedCompactionMetrics metrics) {
     this.context = context;
     this.extent = extent;
     this.fs = context.getVolumeManager();
@@ -173,6 +178,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
     this.env = env;
     this.iterators = iterators;
     this.cryptoService = cs;
+    this.metrics = metrics;
 
     startTime = System.currentTimeMillis();
   }
@@ -268,10 +274,11 @@ public class FileCompactor implements 
Callable<CompactionStats> {
 
       log.trace(String.format(
           "Compaction %s %,d read | %,d written | %,6d entries/sec"
-              + " | %,6.3f secs | %,12d bytes | %9.3f byte/sec",
+              + " | %,6.3f secs | %,12d bytes | %9.3f byte/sec | %,d paused",
           extent, majCStats.getEntriesRead(), majCStats.getEntriesWritten(),
           (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) 
/ 1000.0,
-          mfwTmp.getLength(), mfwTmp.getLength() / ((t2 - t1) / 1000.0)));
+          mfwTmp.getLength(), mfwTmp.getLength() / ((t2 - t1) / 1000.0),
+          majCStats.getTimesPaused()));
 
       majCStats.setFileSize(mfwTmp.getLength());
       return majCStats;
@@ -406,9 +413,31 @@ public class FileCompactor implements 
Callable<CompactionStats> {
         mfw.startDefaultLocalityGroup();
       }
 
+      DetectionScope scope =
+          env.getIteratorScope() == IteratorScope.minc ? DetectionScope.MINC : 
DetectionScope.MAJC;
       Span writeSpan = TraceUtil.startSpan(this.getClass(), "write");
       try (Scope write = writeSpan.makeCurrent()) {
         while (itr.hasTop() && env.isCompactionEnabled()) {
+
+          while (context.getLowMemoryDetector().isRunningLowOnMemory(context, 
scope, () -> {
+            return !extent.isMeta();
+          }, () -> {
+            log.info("Pausing compaction because low on memory, extent: {}", 
extent);
+            timesPaused.incrementAndGet();
+            if (scope == DetectionScope.MINC) {
+              metrics.incrementMinCPause();
+            } else {
+              metrics.incrementMajCPause();
+            }
+            try {
+              Thread.sleep(500);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new IllegalStateException(
+                  "Interrupted while waiting for low memory condition to 
resolve", e);
+            }
+          })) {}
+
           mfw.append(itr.getTopKey(), itr.getTopValue());
           itr.next();
           entriesCompacted++;
@@ -436,12 +465,12 @@ public class FileCompactor implements 
Callable<CompactionStats> {
         }
 
       } finally {
-        CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), 
entriesCompacted);
+        CompactionStats lgMajcStats =
+            new CompactionStats(citr.getCount(), entriesCompacted, 
timesPaused.get());
         majCStats.add(lgMajcStats);
         writeSpan.end();
       }
-
-    } catch (Exception e) {
+    } catch (IOException | CompactionCanceledException e) {
       TraceUtil.setException(compactSpan, e, true);
       throw e;
     } finally {
@@ -477,6 +506,10 @@ public class FileCompactor implements 
Callable<CompactionStats> {
     return entriesWritten.get();
   }
 
+  long getTimesPaused() {
+    return timesPaused.get();
+  }
+
   long getStartTime() {
     return startTime;
   }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
similarity index 50%
copy from 
server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
copy to 
server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
index e3ccfdd798..b8731cda08 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
@@ -18,44 +18,31 @@
  */
 package org.apache.accumulo.server.compaction;
 
-public class CompactionStats {
-  private long entriesRead;
-  private long entriesWritten;
-  private long fileSize;
-
-  public CompactionStats(long er, long ew) {
-    this.setEntriesRead(er);
-    this.setEntriesWritten(ew);
-  }
-
-  public CompactionStats() {}
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.metrics.MetricsUtil;
 
-  private void setEntriesRead(long entriesRead) {
-    this.entriesRead = entriesRead;
-  }
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
 
-  public long getEntriesRead() {
-    return entriesRead;
-  }
+public class PausedCompactionMetrics implements MetricsProducer {
 
-  private void setEntriesWritten(long entriesWritten) {
-    this.entriesWritten = entriesWritten;
-  }
+  private Counter majcPauses;
+  private Counter mincPauses;
 
-  public long getEntriesWritten() {
-    return entriesWritten;
+  public void incrementMinCPause() {
+    mincPauses.increment();
   }
 
-  public void add(CompactionStats mcs) {
-    this.entriesRead += mcs.entriesRead;
-    this.entriesWritten += mcs.entriesWritten;
+  public void incrementMajCPause() {
+    majcPauses.increment();
   }
 
-  public void setFileSize(long fileSize) {
-    this.fileSize = fileSize;
+  @Override
+  public void registerMetrics(MeterRegistry registry) {
+    majcPauses = Counter.builder(METRICS_MAJC_PAUSED).description("major 
compaction pause count")
+        .tags(MetricsUtil.getCommonTags()).register(registry);
+    mincPauses = Counter.builder(METRICS_MINC_PAUSED).description("minor 
compactor pause count")
+        .tags(MetricsUtil.getCommonTags()).register(registry);
   }
 
-  public long getFileSize() {
-    return this.fileSize;
-  }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
 
b/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
index 72daff609c..e13fa93e80 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
@@ -30,4 +30,9 @@ public interface SystemIteratorEnvironment extends 
IteratorEnvironment {
 
   SortedKeyValueIterator<Key,Value> 
getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter);
 
+  @Override
+  default boolean isRunningLowOnMemory() {
+    return getServerContext().getLowMemoryDetector().isRunningLowOnMemory();
+  }
+
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
 
b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
similarity index 58%
rename from 
server/base/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
rename to 
server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
index 469ec5632d..362f6ef173 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
@@ -16,35 +16,94 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.server;
+package org.apache.accumulo.server.mem;
 
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.server.ServerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class GarbageCollectionLogger {
-  private static final Logger log = 
LoggerFactory.getLogger(GarbageCollectionLogger.class);
+public class LowMemoryDetector {
+
+  @FunctionalInterface
+  public static interface Action {
+    void execute();
+  }
+
+  public enum DetectionScope {
+    MINC, MAJC, SCAN
+  };
+
+  private static final Logger log = 
LoggerFactory.getLogger(LowMemoryDetector.class);
 
   private final HashMap<String,Long> prevGcTime = new HashMap<>();
   private long lastMemorySize = 0;
   private long gcTimeIncreasedCount = 0;
-  private static long lastMemoryCheckTime = 0;
-  private static final Lock memCheckTimeLock = new ReentrantLock();
+  private long lastMemoryCheckTime = 0;
+  private final Lock memCheckTimeLock = new ReentrantLock();
+  private volatile boolean runningLowOnMemory = false;
+
+  public long getIntervalMillis(AccumuloConfiguration conf) {
+    return conf.getTimeInMillis(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL);
+  }
+
+  public boolean isRunningLowOnMemory() {
+    return runningLowOnMemory;
+  }
+
+  /**
+   * @param context server context
+   * @param scope whether this is being checked in the context of scan or 
compact code
+   * @param isUserTable boolean as to whether the table being scanned / 
compacted is a user table.
+   *        No action is taken for system tables.
+   * @param action Action to perform when this method returns true
+   * @return true if server running low on memory
+   */
+  public boolean isRunningLowOnMemory(ServerContext context, DetectionScope 
scope,
+      Supplier<Boolean> isUserTable, Action action) {
+    if (isUserTable.get()) {
+      Property p = null;
+      switch (scope) {
+        case SCAN:
+          p = Property.GENERAL_LOW_MEM_SCAN_PROTECTION;
+          break;
+        case MINC:
+          p = Property.GENERAL_LOW_MEM_MINC_PROTECTION;
+          break;
+        case MAJC:
+          p = Property.GENERAL_LOW_MEM_MAJC_PROTECTION;
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown scope: " + scope);
+      }
+      boolean isEnabled = context.getConfiguration().getBoolean(p);
+      // Only incur the penalty of accessing the volatile variable when 
enabled for this scope
+      if (isEnabled) {
+        action.execute();
+        return runningLowOnMemory;
+      }
+    }
+    return false;
+  }
 
   public void logGCInfo(AccumuloConfiguration conf) {
 
+    Double freeMemoryPercentage = 
conf.getFraction(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD);
+
     memCheckTimeLock.lock();
     try {
-      final long now = System.currentTimeMillis();
+      final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 
       List<GarbageCollectorMXBean> gcmBeans = 
ManagementFactory.getGarbageCollectorMXBeans();
       Runtime rt = Runtime.getRuntime();
@@ -81,9 +140,19 @@ public class GarbageCollectionLogger {
         gcTimeIncreasedCount = 0;
       } else {
         gcTimeIncreasedCount++;
-        if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
+        if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 
freeMemoryPercentage) {
+          runningLowOnMemory = true;
           log.warn("Running low on memory");
           gcTimeIncreasedCount = 0;
+        } else {
+          // If we were running low on memory, but are not any longer, than 
log at warn
+          // so that it shows up in the logs
+          if (runningLowOnMemory) {
+            log.warn("Recovered from low memory condition");
+          } else {
+            log.trace("Not running low on memory");
+          }
+          runningLowOnMemory = false;
         }
       }
 
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 5c73ed4b46..1705701ec1 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -77,7 +77,6 @@ import 
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.compaction.RunningCompaction;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.AbstractServer;
-import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
@@ -101,7 +100,6 @@ public class CompactionCoordinator extends AbstractServer
     implements CompactionCoordinatorService.Iface, LiveTServerSet.Listener {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
-  private static final long TIME_BETWEEN_GC_CHECKS = 5000;
   private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15);
 
   protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
@@ -122,7 +120,6 @@ public class CompactionCoordinator extends AbstractServer
   /* Map of queue name to last time compactor called to get a compaction job */
   private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
 
-  private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
   protected SecurityOperation security;
   protected final AccumuloConfiguration aconf;
   protected CompactionFinalizer compactionFinalizer;
@@ -146,7 +143,6 @@ public class CompactionCoordinator extends AbstractServer
     compactionFinalizer = createCompactionFinalizer(schedExecutor);
     tserverSet = createLiveTServerSet();
     setupSecurity();
-    startGCLogger(schedExecutor);
     printStartupMsg();
     startCompactionCleaner(schedExecutor);
     startRunningCleaner(schedExecutor);
@@ -170,13 +166,6 @@ public class CompactionCoordinator extends AbstractServer
     security = getContext().getSecurityOperation();
   }
 
-  protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
-    ScheduledFuture<?> future =
-        schedExecutor.scheduleWithFixedDelay(() -> 
gcLogger.logGCInfo(getConfiguration()), 0,
-            TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
-    ThreadPools.watchNonCriticalScheduledTask(future);
-  }
-
   protected void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
     ScheduledFuture<?> future =
         schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, 
TimeUnit.MINUTES);
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 7225960a53..746f552a3c 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -127,9 +127,6 @@ public class CompactionCoordinatorTest {
     @Override
     protected void setupSecurity() {}
 
-    @Override
-    protected void startGCLogger(ScheduledThreadPoolExecutor stpe) {}
-
     @Override
     protected void printStartupMsg() {}
 
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index a9f0bfbf40..f09484cb9d 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -93,10 +92,10 @@ import 
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.AbstractServer;
-import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.compaction.CompactionInfo;
 import org.apache.accumulo.server.compaction.CompactionWatcher;
 import org.apache.accumulo.server.compaction.FileCompactor;
+import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.compaction.RetryableThriftCall;
 import 
org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
 import org.apache.accumulo.server.conf.TableConfiguration;
@@ -123,14 +122,12 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
   private static final SecureRandom random = new SecureRandom();
 
   private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
-  private static final long TIME_BETWEEN_GC_CHECKS = 5000;
   private static final long TIME_BETWEEN_CANCEL_CHECKS = MINUTES.toMillis(5);
 
   private static final long TEN_MEGABYTES = 10485760;
 
   protected static final CompactionJobHolder JOB_HOLDER = new 
CompactionJobHolder();
 
-  private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
   private final UUID compactorId = UUID.randomUUID();
   private final AccumuloConfiguration aconf;
   private final String queueName;
@@ -141,6 +138,7 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
   private SecurityOperation security;
   private ServiceLock compactorLock;
   private ServerAddress compactorAddress = null;
+  private PausedCompactionMetrics pausedMetrics;
 
   // Exposed for tests
   protected volatile boolean shutdown = false;
@@ -159,7 +157,6 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     watcher = new CompactionWatcher(aconf);
     var schedExecutor =
         
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf);
-    startGCLogger(schedExecutor);
     startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS);
     printStartupMsg();
   }
@@ -180,13 +177,6 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     security = getContext().getSecurityOperation();
   }
 
-  protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
-    ScheduledFuture<?> future =
-        schedExecutor.scheduleWithFixedDelay(() -> 
gcLogger.logGCInfo(getConfiguration()), 0,
-            TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
-    ThreadPools.watchNonCriticalScheduledTask(future);
-  }
-
   protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor,
       long timeBetweenChecks) {
     
ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(
@@ -275,7 +265,7 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
       public void lostLock(final LockLossReason reason) {
         Halt.halt(1, () -> {
           LOG.error("Compactor lost lock (reason = {}), exiting.", reason);
-          gcLogger.logGCInfo(getConfiguration());
+          getContext().getLowMemoryDetector().logGCInfo(getConfiguration());
         });
       }
 
@@ -526,8 +516,9 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
             .forEach(tis -> 
iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
 
         ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName);
-        FileCompactor compactor = new FileCompactor(getContext(), extent, 
files, outputFile,
-            job.isPropagateDeletes(), cenv, iters, aConfig, 
tConfig.getCryptoService());
+        FileCompactor compactor =
+            new FileCompactor(getContext(), extent, files, outputFile, 
job.isPropagateDeletes(),
+                cenv, iters, aConfig, tConfig.getCryptoService(), 
pausedMetrics);
 
         LOG.trace("Starting compactor");
         started.countDown();
@@ -610,7 +601,8 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
         | SecurityException e1) {
       LOG.error("Error initializing metrics, metrics will not be emitted.", 
e1);
     }
-    MetricsUtil.initializeProducers(this);
+    pausedMetrics = new PausedCompactionMetrics();
+    MetricsUtil.initializeProducers(this, pausedMetrics);
 
     LOG.info("Compactor started, waiting for work");
     try {
@@ -671,9 +663,9 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
                       Float.toString((info.getEntriesRead() / (float) 
inputEntries) * 100);
                 }
                 String message = String.format(
-                    "Compaction in progress, read %d of %d input entries ( %s 
%s ), written %d entries",
+                    "Compaction in progress, read %d of %d input entries ( %s 
%s ), written %d entries, paused %d times",
                     info.getEntriesRead(), inputEntries, percentComplete, "%",
-                    info.getEntriesWritten());
+                    info.getEntriesWritten(), info.getTimesPaused());
                 watcher.run();
                 try {
                   LOG.debug("Updating coordinator with compaction progress: 
{}.", message);
@@ -785,7 +777,7 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
         LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
       }
 
-      gcLogger.logGCInfo(getConfiguration());
+      getContext().getLowMemoryDetector().logGCInfo(getConfiguration());
       LOG.info("stop requested. exiting ... ");
       try {
         if (null != compactorLock) {
diff --git 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 11b68e007a..e61f9f1903 100644
--- 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -28,7 +28,6 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Supplier;
@@ -51,6 +50,7 @@ import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.ServerContext;
 import 
org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.mem.LowMemoryDetector;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
@@ -192,9 +192,6 @@ public class CompactorTest {
     @Override
     protected void setupSecurity() {}
 
-    @Override
-    protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {}
-
     @Override
     protected void printStartupMsg() {}
 
@@ -335,6 +332,7 @@ public class CompactorTest {
 
     ServerContext context = PowerMock.createNiceMock(ServerContext.class);
     expect(context.getConfiguration()).andReturn(conf).anyTimes();
+    expect(context.getLowMemoryDetector()).andReturn(new 
LowMemoryDetector()).anyTimes();
     ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
     ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
     expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
@@ -383,6 +381,7 @@ public class CompactorTest {
 
     ServerContext context = PowerMock.createNiceMock(ServerContext.class);
     expect(context.getConfiguration()).andReturn(conf).anyTimes();
+    expect(context.getLowMemoryDetector()).andReturn(new 
LowMemoryDetector()).anyTimes();
     ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
     ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
     expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
@@ -431,6 +430,7 @@ public class CompactorTest {
 
     ServerContext context = PowerMock.createNiceMock(ServerContext.class);
     expect(context.getConfiguration()).andReturn(conf).anyTimes();
+    expect(context.getLowMemoryDetector()).andReturn(new 
LowMemoryDetector()).anyTimes();
     ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
     ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
     expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 5a2608b1b1..77047f013b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -87,8 +87,8 @@ import 
org.apache.accumulo.core.util.ServiceLockData.ThriftService;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.AbstractServer;
-import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.rpc.ServerAddress;
@@ -182,7 +182,6 @@ public class ScanServer extends AbstractServer
   private final SessionManager sessionManager;
   private final TabletServerResourceManager resourceManager;
   HostAndPort clientAddress;
-  private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
 
   private volatile boolean serverStopRequested = false;
   private ServiceLock scanServerLock;
@@ -313,7 +312,7 @@ public class ScanServer extends AbstractServer
             if (!serverStopRequested) {
               LOG.error("Lost tablet server lock (reason = {}), exiting.", 
reason);
             }
-            gcLogger.logGCInfo(getConfiguration());
+            context.getLowMemoryDetector().logGCInfo(getConfiguration());
           });
         }
 
@@ -392,7 +391,7 @@ public class ScanServer extends AbstractServer
         LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
       }
 
-      gcLogger.logGCInfo(getConfiguration());
+      context.getLowMemoryDetector().logGCInfo(getConfiguration());
       LOG.info("stop requested. exiting ... ");
       try {
         if (null != lock) {
@@ -992,6 +991,12 @@ public class ScanServer extends AbstractServer
     return scanMetrics;
   }
 
+  @Override
+  public PausedCompactionMetrics getPausedCompactionMetrics() {
+    // ScanServer does not perform compactions
+    return null;
+  }
+
   @Override
   public Session getSession(long scanID) {
     return sessionManager.getSession(scanID);
@@ -1012,11 +1017,6 @@ public class ScanServer extends AbstractServer
     return managerLockCache;
   }
 
-  @Override
-  public GarbageCollectionLogger getGcLogger() {
-    return gcLogger;
-  }
-
   @Override
   public BlockCacheConfiguration 
getBlockCacheConfiguration(AccumuloConfiguration acuConf) {
     return BlockCacheConfiguration.forScanServer(acuConf);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 1f6a649a4c..09aa2169c4 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -1072,7 +1072,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
       Halt.halt(1, () -> {
         log.info("Tablet server no longer holds lock during checkPermission() 
: {}, exiting",
             request);
-        server.getGcLogger().logGCInfo(server.getConfiguration());
+        context.getLowMemoryDetector().logGCInfo(server.getConfiguration());
       });
     }
 
@@ -1260,7 +1260,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
 
     Halt.halt(0, () -> {
       log.info("Manager requested tablet server halt");
-      server.gcLogger.logGCInfo(server.getConfiguration());
+      context.getLowMemoryDetector().logGCInfo(server.getConfiguration());
       server.requestStop();
       try {
         server.getLock().unlock();
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
index 3715eac0c2..bbff2e1377 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
@@ -24,8 +24,8 @@ import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.spi.cache.BlockCacheManager;
 import org.apache.accumulo.core.spi.scan.ScanServerInfo;
-import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
 import org.apache.accumulo.tserver.session.Session;
@@ -50,6 +50,8 @@ public interface TabletHostingServer {
 
   TabletServerScanMetrics getScanMetrics();
 
+  PausedCompactionMetrics getPausedCompactionMetrics();
+
   Session getSession(long scanID);
 
   TableConfiguration getTableConfiguration(KeyExtent threadPoolExtent);
@@ -58,7 +60,5 @@ public interface TabletHostingServer {
 
   ZooCache getManagerLockCache();
 
-  GarbageCollectionLogger getGcLogger();
-
   BlockCacheManager.Configuration 
getBlockCacheConfiguration(AccumuloConfiguration acuConf);
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1f614035f1..521335e1fc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -109,11 +109,11 @@ import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.AbstractServer;
-import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.TabletLevel;
 import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.compaction.CompactionWatcher;
+import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -175,10 +175,8 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
 
   private static final SecureRandom random = new SecureRandom();
   private static final Logger log = 
LoggerFactory.getLogger(TabletServer.class);
-  private static final long TIME_BETWEEN_GC_CHECKS = 
TimeUnit.SECONDS.toMillis(5);
   private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 
TimeUnit.HOURS.toMillis(1);
 
-  final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
   final ZooCache managerLockCache;
 
   final TabletServerLogger logger;
@@ -188,6 +186,7 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
   TabletServerScanMetrics scanMetrics;
   TabletServerMinCMetrics mincMetrics;
   CompactionExecutorsMetrics ceMetrics;
+  PausedCompactionMetrics pausedMetrics;
 
   @Override
   public TabletServerScanMetrics getScanMetrics() {
@@ -198,6 +197,11 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
     return mincMetrics;
   }
 
+  @Override
+  public PausedCompactionMetrics getPausedCompactionMetrics() {
+    return pausedMetrics;
+  }
+
   private final LogSorter logSorter;
   final TabletStatsKeeper statsKeeper;
   private final AtomicInteger logIdGenerator = new AtomicInteger();
@@ -612,11 +616,6 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
     return managerLockCache;
   }
 
-  @Override
-  public GarbageCollectionLogger getGcLogger() {
-    return gcLogger;
-  }
-
   private void announceExistence() {
     ZooReaderWriter zoo = getContext().getZooReaderWriter();
     try {
@@ -644,7 +643,7 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
             if (!serverStopRequested) {
               log.error("Lost tablet server lock (reason = {}), exiting.", 
reason);
             }
-            gcLogger.logGCInfo(getConfiguration());
+            context.getLowMemoryDetector().logGCInfo(getConfiguration());
           });
         }
 
@@ -717,7 +716,9 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
     scanMetrics = new TabletServerScanMetrics();
     mincMetrics = new TabletServerMinCMetrics();
     ceMetrics = new CompactionExecutorsMetrics();
-    MetricsUtil.initializeProducers(metrics, updateMetrics, scanMetrics, 
mincMetrics, ceMetrics);
+    pausedMetrics = new PausedCompactionMetrics();
+    MetricsUtil.initializeProducers(metrics, updateMetrics, scanMetrics, 
mincMetrics, ceMetrics,
+        pausedMetrics);
 
     this.compactionManager = new CompactionManager(() -> Iterators
         .transform(onlineTablets.snapshot().values().iterator(), 
Tablet::asCompactable),
@@ -889,7 +890,7 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
       log.warn("Failed to close filesystem : {}", e.getMessage(), e);
     }
 
-    gcLogger.logGCInfo(getConfiguration());
+    context.getLowMemoryDetector().logGCInfo(getConfiguration());
 
     log.info("TServerInfo: stop requested. exiting ... ");
 
@@ -955,12 +956,6 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
     Threads.createThread("Split/MajC initiator", new 
MajorCompactor(context)).start();
 
     clientAddress = HostAndPort.fromParts(getHostname(), 0);
-
-    Runnable gcDebugTask = () -> gcLogger.logGCInfo(getConfiguration());
-
-    ScheduledFuture<?> future = 
context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask,
-        0, TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
-    watchNonCriticalScheduledTask(future);
   }
 
   public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> 
scanCounts) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index f39df6789f..9957eb1dee 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -370,6 +370,7 @@ public class CompactionManager {
     public CompactionExecutorId ceid;
     public int running;
     public int queued;
+    public int paused;
   }
 
   public Collection<ExtCompMetric> getExternalMetrics() {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
index eef8c62315..57e33d2334 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
@@ -41,6 +41,8 @@ public class TabletServerScanMetrics implements 
MetricsProducer {
   private Counter continueScanCalls;
   private Counter closeScanCalls;
   private Counter busyTimeoutReturned;
+  private Counter pausedForMemory;
+  private Counter earlyReturnForMemory;
 
   private final LongAdder lookupCount = new LongAdder();
   private final LongAdder queryResultCount = new LongAdder();
@@ -119,6 +121,14 @@ public class TabletServerScanMetrics implements 
MetricsProducer {
     busyTimeoutReturned.increment(value);
   }
 
+  public void incrementScanPausedForLowMemory() {
+    pausedForMemory.increment();
+  }
+
+  public void incrementEarlyReturnForLowMemory() {
+    earlyReturnForMemory.increment();
+  }
+
   @Override
   public void registerMetrics(MeterRegistry registry) {
     Gauge.builder(METRICS_SCAN_OPEN_FILES, openFiles::get)
@@ -150,6 +160,12 @@ public class TabletServerScanMetrics implements 
MetricsProducer {
         .description("Query rate (bytes/sec)").register(registry);
     Gauge.builder(METRICS_TSERVER_SCANNED_ENTRIES, this, 
TabletServerScanMetrics::getScannedCount)
         .description("Scanned rate").register(registry);
+    pausedForMemory = Counter.builder(METRICS_SCAN_PAUSED_FOR_MEM)
+        .description("scan paused due to server being low on memory")
+        .tags(MetricsUtil.getCommonTags()).register(registry);
+    earlyReturnForMemory = Counter.builder(METRICS_SCAN_RETURN_FOR_MEM)
+        .description("scan returned results early due to server being low on 
memory")
+        .tags(MetricsUtil.getCommonTags()).register(registry);
   }
 
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index 195625c32d..bb62af44a0 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -430,7 +430,7 @@ public class CompactableUtils {
 
     FileCompactor compactor = new FileCompactor(tablet.getContext(), 
tablet.getExtent(),
         compactFiles, tmpFileName, cInfo.propagateDeletes, cenv, cInfo.iters, 
compactionConfig,
-        tableConf.getCryptoService());
+        tableConf.getCryptoService(), tablet.getPausedCompactionMetrics());
 
     return compactor.call();
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index 7a6ee05b6e..0092da7712 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -57,7 +57,7 @@ public class MinorCompactor extends FileCompactor {
       TabletFile outputFile, MinorCompactionReason mincReason, 
TableConfiguration tableConfig) {
     super(tabletServer.getContext(), tablet.getExtent(), 
Collections.emptyMap(), outputFile, true,
         new MinCEnv(mincReason, imm.compactionIterator()), 
Collections.emptyList(), tableConfig,
-        tableConfig.getCryptoService());
+        tableConfig.getCryptoService(), 
tabletServer.getPausedCompactionMetrics());
     this.tabletServer = tabletServer;
     this.mincReason = mincReason;
   }
@@ -146,7 +146,7 @@ public class MinorCompactor extends FileCompactor {
         }
 
         if (isTableDeleting()) {
-          return new CompactionStats(0, 0);
+          return new CompactionStats(0, 0, 0);
         }
 
       } while (true);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 2ec3ad4707..12266f5995 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -91,6 +91,7 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.CompactionStats;
+import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles;
@@ -1955,6 +1956,10 @@ public class Tablet extends TabletBase {
     return getTabletServer().getScanMetrics();
   }
 
+  public PausedCompactionMetrics getPausedCompactionMetrics() {
+    return getTabletServer().getPausedCompactionMetrics();
+  }
+
   DatafileManager getDatafileManager() {
     return datafileManager;
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
index ce8cddc348..1f54e66eac 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
@@ -54,6 +54,7 @@ import org.apache.accumulo.core.util.ShutdownUtil;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.TooManyFilesException;
+import org.apache.accumulo.server.mem.LowMemoryDetector.DetectionScope;
 import org.apache.accumulo.tserver.InMemoryMap;
 import org.apache.accumulo.tserver.TabletHostingServer;
 import org.apache.accumulo.tserver.TabletServerResourceManager;
@@ -87,10 +88,13 @@ public abstract class TabletBase {
 
   protected final TableConfiguration tableConfiguration;
 
+  private final boolean isUserTable;
+
   public TabletBase(TabletHostingServer server, KeyExtent extent) {
     this.context = server.getContext();
     this.server = server;
     this.extent = extent;
+    this.isUserTable = !extent.isMeta();
 
     TableConfiguration tblConf = 
context.getTableConfiguration(extent.tableId());
     if (tblConf == null) {
@@ -229,7 +233,19 @@ public abstract class TabletBase {
   Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, 
ScanParameters scanParams)
       throws IOException {
 
-    // log.info("In nextBatch..");
+    while (context.getLowMemoryDetector().isRunningLowOnMemory(context, 
DetectionScope.SCAN, () -> {
+      return isUserTable;
+    }, () -> {
+      log.info("Not starting next batch because low on memory, extent: {}", 
extent);
+      server.getScanMetrics().incrementScanPausedForLowMemory();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(
+            "Interrupted while waiting for low memory condition to resolve", 
e);
+      }
+    })) {}
 
     long batchTimeOut = scanParams.getBatchTimeOut();
 
@@ -279,7 +295,15 @@ public abstract class TabletBase {
 
       boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) 
>= timeToRun;
 
-      if (resultSize >= maxResultsSize || results.size() >= 
scanParams.getMaxEntries() || timesUp) {
+      boolean runningLowOnMemory =
+          context.getLowMemoryDetector().isRunningLowOnMemory(context, 
DetectionScope.SCAN, () -> {
+            return isUserTable;
+          }, () -> {
+            log.info("Not continuing next batch because low on memory, extent: 
{}", extent);
+            server.getScanMetrics().incrementEarlyReturnForLowMemory();
+          });
+      if (runningLowOnMemory || resultSize >= maxResultsSize
+          || results.size() >= scanParams.getMaxEntries() || timesUp) {
         continueKey = new Key(key);
         skipContinueKey = true;
         break;
@@ -318,6 +342,20 @@ public abstract class TabletBase {
   private Tablet.LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, 
List<Range> ranges,
       List<KVEntry> results, ScanParameters scanParams, long maxResultsSize) 
throws IOException {
 
+    while (context.getLowMemoryDetector().isRunningLowOnMemory(context, 
DetectionScope.SCAN, () -> {
+      return isUserTable;
+    }, () -> {
+      log.info("Not starting lookup because low on memory, extent: {}", 
extent);
+      server.getScanMetrics().incrementScanPausedForLowMemory();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(
+            "Interrupted while waiting for low memory condition to resolve", 
e);
+      }
+    })) {}
+
     Tablet.LookupResult lookupResult = new Tablet.LookupResult();
 
     boolean exceededMemoryUsage = false;
@@ -346,7 +384,14 @@ public abstract class TabletBase {
 
       boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > 
timeToRun;
 
-      if (exceededMemoryUsage || tabletClosed || timesUp || yielded) {
+      boolean runningLowOnMemory =
+          context.getLowMemoryDetector().isRunningLowOnMemory(context, 
DetectionScope.SCAN, () -> {
+            return isUserTable;
+          }, () -> {
+            log.info("Not continuing lookup because low on memory, extent: 
{}", extent);
+            server.getScanMetrics().incrementEarlyReturnForLowMemory();
+          });
+      if (runningLowOnMemory || exceededMemoryUsage || tabletClosed || timesUp 
|| yielded) {
         lookupResult.unfinishedRanges.add(range);
         continue;
       }
@@ -377,7 +422,14 @@ public abstract class TabletBase {
 
           timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > 
timeToRun;
 
-          if (exceededMemoryUsage || timesUp) {
+          runningLowOnMemory = 
context.getLowMemoryDetector().isRunningLowOnMemory(context,
+              DetectionScope.SCAN, () -> {
+                return isUserTable;
+              }, () -> {
+                log.info("Not continuing lookup because low on memory, extent: 
{}", extent);
+                server.getScanMetrics().incrementEarlyReturnForLowMemory();
+              });
+          if (runningLowOnMemory || exceededMemoryUsage || timesUp) {
             addUnfinishedRange(lookupResult, range, key);
             break;
           }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java
new file mode 100644
index 0000000000..3a7f5f653e
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class MemoryConsumingIterator extends WrappingIterator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MemoryConsumingIterator.class);
+
+  private static final List<byte[]> BUFFERS = new ArrayList<>();
+
+  public static void freeBuffers() {
+    BUFFERS.clear();
+  }
+
+  @SuppressFBWarnings(value = "DM_GC", justification = "gc is okay for test")
+  private int getAmountToConsume() {
+    System.gc();
+    Runtime runtime = Runtime.getRuntime();
+    long maxConfiguredMemory = runtime.maxMemory();
+    long allocatedMemory = runtime.totalMemory();
+    long allocatedFreeMemory = runtime.freeMemory();
+    long freeMemory = maxConfiguredMemory - (allocatedMemory - 
allocatedFreeMemory);
+    long minimumFreeMemoryThreshold =
+        (long) (maxConfiguredMemory * 
MemoryStarvedScanIT.FREE_MEMORY_THRESHOLD);
+
+    int amountToConsume = 0;
+    if (freeMemory > minimumFreeMemoryThreshold) {
+      amountToConsume = (int) (freeMemory - (minimumFreeMemoryThreshold - 
10485760));
+    }
+    if (amountToConsume > Integer.MAX_VALUE) {
+      throw new IllegalStateException(
+          "Unsupported memory size for tablet server when using this 
iterator");
+    }
+    amountToConsume = Math.max(0, amountToConsume);
+    LOG.info("max: {}, free: {}, minFree: {}, amountToConsume: {}", 
maxConfiguredMemory, freeMemory,
+        minimumFreeMemoryThreshold, amountToConsume);
+    return amountToConsume;
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
+      throws IOException {
+    LOG.info("seek called");
+    while (!this.isRunningLowOnMemory()) {
+      int amountToConsume = getAmountToConsume();
+      if (amountToConsume > 0) {
+        LOG.info("allocating memory: " + amountToConsume);
+        BUFFERS.add(new byte[amountToConsume]);
+        LOG.info("memory allocated");
+      } else {
+        LOG.info("Waiting for LowMemoryDetector to recognize low on memory 
condition.");
+      }
+      Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+    }
+    LOG.info("Running low on memory == true");
+    super.seek(range, columnFamilies, inclusive);
+  }
+
+}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
similarity index 58%
copy from 
server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
copy to 
test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
index 72daff609c..dde5e1c14b 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
@@ -16,18 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.server.iterators;
+package org.apache.accumulo.test.functional;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.core.iterators.WrappingIterator;
 
-public interface SystemIteratorEnvironment extends IteratorEnvironment {
+import com.google.common.util.concurrent.Uninterruptibles;
 
-  ServerContext getServerContext();
+public class MemoryFreeingIterator extends WrappingIterator {
 
-  SortedKeyValueIterator<Key,Value> 
getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter);
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    MemoryConsumingIterator.freeBuffers();
+    while (this.isRunningLowOnMemory()) {
+      // wait for LowMemoryDetector to recognize the memory is free.
+      Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+    }
+  }
 
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
new file mode 100644
index 0000000000..da4c98a7a5
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.DoubleAdder;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class MemoryStarvedMajCIT extends SharedMiniClusterBase {
+
+  public static class MemoryStarvedITConfiguration implements 
MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
+      cfg.setNumTservers(1);
+      cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE);
+      // Configure the LowMemoryDetector in the TabletServer
+      // check on 1s intervals and set low mem condition if more than 80% of
+      // the heap is used.
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL, "5s");
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD,
+          Double.toString(MemoryStarvedScanIT.FREE_MEMORY_THRESHOLD));
+      cfg.setProperty(Property.GENERAL_LOW_MEM_MAJC_PROTECTION, "true");
+      // Tell the server processes to use a StatsDMeterRegistry that will be 
configured
+      // to push all metrics to the sink we started.
+      cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+      cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
+          TestStatsDRegistryFactory.class.getName());
+      Map<String,String> sysProps = 
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+          TestStatsDRegistryFactory.SERVER_PORT, 
Integer.toString(sink.getPort()));
+      cfg.setSystemProperties(sysProps);
+    }
+  }
+
+  private static final DoubleAdder MAJC_PAUSED = new DoubleAdder();
+  private static TestStatsDSink sink;
+  private static Thread metricConsumer;
+
+  @BeforeAll
+  public static void start() throws Exception {
+    sink = new TestStatsDSink();
+    metricConsumer = new Thread(() -> {
+      while (!Thread.currentThread().isInterrupted()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String line : statsDMetrics) {
+          if (Thread.currentThread().isInterrupted()) {
+            break;
+          }
+          if (line.startsWith("accumulo")) {
+            Metric metric = TestStatsDSink.parseStatsDMetric(line);
+            if (MetricsProducer.METRICS_MAJC_PAUSED.equals(metric.getName())) {
+              Double val = Double.parseDouble(metric.getValue());
+              MAJC_PAUSED.add(val);
+            }
+          }
+        }
+      }
+    });
+    metricConsumer.start();
+
+    SharedMiniClusterBase.startMiniClusterWithConfig(new 
MemoryStarvedITConfiguration());
+  }
+
+  @AfterAll
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+    sink.close();
+    metricConsumer.interrupt();
+    metricConsumer.join();
+  }
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    // Reset the client side counters
+    MAJC_PAUSED.reset();
+  }
+
+  @Test
+  public void testMajCPauses() throws Exception {
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      // Add a small amount of data so that the MemoryConsumingIterator
+      // returns true when trying to consume all of the memory.
+      ReadWriteIT.ingest(client, 1, 1, 1, 0, table);
+
+      AtomicReference<Throwable> error = new AtomicReference<>();
+      Thread compactionThread = new Thread(() -> {
+        try {
+          to.compact(table, new CompactionConfig().setWait(false));
+        } catch (Exception e) {
+          error.set(e);
+        }
+      });
+
+      try (Scanner scanner = client.createScanner(table)) {
+
+        MemoryStarvedScanIT.consumeServerMemory(scanner, table);
+
+        Double paused = MAJC_PAUSED.doubleValue();
+        assertEquals(0, paused);
+
+        ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
+        compactionThread.start();
+
+        while (paused == 0) {
+          Thread.sleep(1000);
+          paused = MAJC_PAUSED.doubleValue();
+        }
+        assertTrue(paused > 0);
+
+        MemoryStarvedScanIT.freeServerMemory(client, table);
+        compactionThread.interrupt();
+        compactionThread.join();
+        assertNull(error.get());
+        assertTrue(client.instanceOperations().getActiveCompactions().stream()
+            .filter(ac -> ac.getPausedCount() > 0).findAny().isPresent());
+      }
+    }
+
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
new file mode 100644
index 0000000000..4c91c17635
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.DoubleAdder;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class MemoryStarvedMinCIT extends SharedMiniClusterBase {
+
+  public static class MemoryStarvedITConfiguration implements 
MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
+      cfg.setNumTservers(1);
+      cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE);
+      // Configure the LowMemoryDetector in the TabletServer
+      // check on 1s intervals and set low mem condition if more than 80% of
+      // the heap is used.
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL, "5s");
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD,
+          Double.toString(MemoryStarvedScanIT.FREE_MEMORY_THRESHOLD));
+      cfg.setProperty(Property.GENERAL_LOW_MEM_MINC_PROTECTION, "true");
+      // Tell the server processes to use a StatsDMeterRegistry that will be 
configured
+      // to push all metrics to the sink we started.
+      cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+      cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
+          TestStatsDRegistryFactory.class.getName());
+      Map<String,String> sysProps = 
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+          TestStatsDRegistryFactory.SERVER_PORT, 
Integer.toString(sink.getPort()));
+      cfg.setSystemProperties(sysProps);
+    }
+  }
+
+  private static final DoubleAdder MINC_PAUSED = new DoubleAdder();
+  private static TestStatsDSink sink;
+  private static Thread metricConsumer;
+
+  @BeforeAll
+  public static void start() throws Exception {
+    sink = new TestStatsDSink();
+    metricConsumer = new Thread(() -> {
+      while (!Thread.currentThread().isInterrupted()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String line : statsDMetrics) {
+          if (Thread.currentThread().isInterrupted()) {
+            break;
+          }
+          if (line.startsWith("accumulo")) {
+            Metric metric = TestStatsDSink.parseStatsDMetric(line);
+            if (MetricsProducer.METRICS_MINC_PAUSED.equals(metric.getName())) {
+              Double val = Double.parseDouble(metric.getValue());
+              MINC_PAUSED.add(val);
+            }
+          }
+        }
+      }
+    });
+    metricConsumer.start();
+
+    SharedMiniClusterBase.startMiniClusterWithConfig(new 
MemoryStarvedITConfiguration());
+  }
+
+  @AfterAll
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+    sink.close();
+    metricConsumer.interrupt();
+    metricConsumer.join();
+  }
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    // Reset the client side counters
+    MINC_PAUSED.reset();
+  }
+
+  @Test
+  public void testMinCPauses() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      // Add a small amount of data so that the MemoryConsumingIterator
+      // returns true when trying to consume all of the memory.
+      ReadWriteIT.ingest(client, 1, 1, 1, 0, table);
+
+      AtomicReference<Throwable> error = new AtomicReference<>();
+      Thread ingestThread = new Thread(() -> {
+        try {
+          ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
+          to.flush(table);
+        } catch (Exception e) {
+          error.set(e);
+        }
+      });
+
+      try (Scanner scanner = client.createScanner(table)) {
+
+        MemoryStarvedScanIT.consumeServerMemory(scanner, table);
+
+        Double paused = MINC_PAUSED.doubleValue();
+        assertEquals(0, paused);
+
+        ingestThread.start();
+
+        while (paused == 0) {
+          Thread.sleep(1000);
+          paused = MINC_PAUSED.doubleValue();
+        }
+        assertTrue(paused > 0);
+
+        MemoryStarvedScanIT.freeServerMemory(client, table);
+        ingestThread.interrupt();
+        ingestThread.join();
+        assertNull(error.get());
+        assertTrue(client.instanceOperations().getActiveCompactions().stream()
+            .filter(ac -> ac.getPausedCount() > 
0).collect(Collectors.toList()).size() > 0);
+      }
+    }
+  }
+
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
new file mode 100644
index 0000000000..ea67ef3734
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.DoubleAdder;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class MemoryStarvedScanIT extends SharedMiniClusterBase {
+
+  public static class MemoryStarvedITConfiguration implements 
MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
+      cfg.setNumTservers(1);
+      cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE);
+      // Configure the LowMemoryDetector in the TabletServer
+      // check on 1s intervals and set low mem condition if more than 80% of
+      // the heap is used.
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL, "5s");
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD,
+          Double.toString(FREE_MEMORY_THRESHOLD));
+      cfg.setProperty(Property.GENERAL_LOW_MEM_SCAN_PROTECTION, "true");
+      // Tell the server processes to use a StatsDMeterRegistry that will be 
configured
+      // to push all metrics to the sink we started.
+      cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+      cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
+          TestStatsDRegistryFactory.class.getName());
+      Map<String,String> sysProps = 
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+          TestStatsDRegistryFactory.SERVER_PORT, 
Integer.toString(sink.getPort()));
+      cfg.setSystemProperties(sysProps);
+    }
+  }
+
+  public static final Double FREE_MEMORY_THRESHOLD = 0.20D;
+
+  private static final DoubleAdder SCAN_START_DELAYED = new DoubleAdder();
+  private static final DoubleAdder SCAN_RETURNED_EARLY = new DoubleAdder();
+  private static TestStatsDSink sink;
+  private static Thread metricConsumer;
+
+  @BeforeAll
+  public static void start() throws Exception {
+    sink = new TestStatsDSink();
+    metricConsumer = new Thread(() -> {
+      while (!Thread.currentThread().isInterrupted()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String line : statsDMetrics) {
+          if (Thread.currentThread().isInterrupted()) {
+            break;
+          }
+          if (line.startsWith("accumulo")) {
+            Metric metric = TestStatsDSink.parseStatsDMetric(line);
+            if 
(MetricsProducer.METRICS_SCAN_PAUSED_FOR_MEM.equals(metric.getName())) {
+              Double val = Double.parseDouble(metric.getValue());
+              SCAN_START_DELAYED.add(val);
+            } else if 
(MetricsProducer.METRICS_SCAN_RETURN_FOR_MEM.equals(metric.getName())) {
+              Double val = Double.parseDouble(metric.getValue());
+              SCAN_RETURNED_EARLY.add(val);
+            }
+          }
+        }
+      }
+    });
+    metricConsumer.start();
+
+    SharedMiniClusterBase.startMiniClusterWithConfig(new 
MemoryStarvedITConfiguration());
+  }
+
+  @AfterAll
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+    sink.close();
+    metricConsumer.interrupt();
+    metricConsumer.join();
+  }
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    // Reset the client side counters
+    SCAN_START_DELAYED.reset();
+    SCAN_START_DELAYED.reset();
+  }
+
+  static void consumeServerMemory(Scanner scanner, String table) throws 
Exception {
+    // This iterator will attempt to consume all free memory in the 
TabletServer
+    scanner.addScanIterator(new IteratorSetting(11, 
MemoryConsumingIterator.class, Map.of()));
+    scanner.setBatchSize(1);
+    // Set the ReadaheadThreshold to a large number so that another background 
thread
+    // that performs read-ahead of KV pairs is not started.
+    scanner.setReadaheadThreshold(Long.MAX_VALUE);
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    // This should block until the GarbageCollectionLogger runs and notices 
that the
+    // VM is low on memory.
+    assertTrue(iter.hasNext());
+  }
+
+  private void consumeServerMemory(BatchScanner scanner, String table) throws 
Exception {
+    // This iterator will attempt to consume all free memory in the 
TabletServer
+    scanner.addScanIterator(new IteratorSetting(11, 
MemoryConsumingIterator.class, Map.of()));
+    scanner.setRanges(Collections.singletonList(new Range()));
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    // This should block until the GarbageCollectionLogger runs and notices 
that the
+    // VM is low on memory.
+    assertTrue(iter.hasNext());
+  }
+
+  static void freeServerMemory(AccumuloClient client, String table) throws 
Exception {
+    try (Scanner scanner = client.createScanner(table)) {
+      scanner.addScanIterator(new IteratorSetting(11, 
MemoryFreeingIterator.class, Map.of()));
+      @SuppressWarnings("unused")
+      Iterator<Entry<Key,Value>> iter = scanner.iterator(); // init'ing the 
iterator should be
+                                                            // enough to free 
the memory
+    }
+  }
+
+  @Test
+  public void testScanReturnsEarlyDueToLowMemory() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
+
+      try (Scanner scanner = client.createScanner(table)) {
+        Double returned = SCAN_RETURNED_EARLY.doubleValue();
+        Double paused = SCAN_START_DELAYED.doubleValue();
+
+        consumeServerMemory(scanner, table);
+
+        // Wait for longer than the memory check interval
+        Thread.sleep(6000);
+
+        // The metric that indicates a scan was returned early due to low 
memory should
+        // have been incremented.
+        assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned);
+        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
+        freeServerMemory(client, table);
+      } finally {
+        to.delete(table);
+      }
+    }
+  }
+
+  @Test
+  public void testScanPauses() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
+
+      try (Scanner dataConsumingScanner = client.createScanner(table);
+          Scanner memoryConsumingScanner = client.createScanner(table)) {
+
+        dataConsumingScanner.addScanIterator(
+            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", 
"500")));
+        dataConsumingScanner.setBatchSize(1);
+        dataConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
+        Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
+        AtomicInteger fetched = new AtomicInteger(0);
+        Thread t = new Thread(() -> {
+          int i = 0;
+          while (iter.hasNext()) {
+            iter.next();
+            fetched.set(++i);
+          }
+        });
+
+        memoryConsumingScanner
+            .addScanIterator(new IteratorSetting(11, 
MemoryConsumingIterator.class, Map.of()));
+        memoryConsumingScanner.setBatchSize(1);
+        memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
+
+        t.start();
+
+        // Wait until the dataConsumingScanner has started fetching data
+        int currentCount = fetched.get();
+        while (currentCount == 0) {
+          Thread.sleep(500);
+          currentCount = fetched.get();
+        }
+
+        // This should block until the GarbageCollectionLogger runs and 
notices that the
+        // VM is low on memory.
+        Iterator<Entry<Key,Value>> consumingIter = 
memoryConsumingScanner.iterator();
+        assertTrue(consumingIter.hasNext());
+
+        // Confirm that some data was fetched by the memoryConsumingScanner
+        currentCount = fetched.get();
+        assertTrue(currentCount > 0 && currentCount < 100);
+
+        // Grab the current metric counts, wait
+        Double returned = SCAN_RETURNED_EARLY.doubleValue();
+        Double paused = SCAN_START_DELAYED.doubleValue();
+        Thread.sleep(1500);
+        // One of two conditions could exist here:
+        // The number of fetched rows equals the current count before the wait 
above
+        // and the SCAN_START_DELAYED has been incremented OR the number of 
fetched
+        // rows is one more than the current count and the SCAN_RETURNED_EARLY 
has
+        // been incremented.
+        assertTrue((currentCount == fetched.get() && 
SCAN_START_DELAYED.doubleValue() > paused)
+            || (currentCount + 1 == fetched.get() && 
SCAN_RETURNED_EARLY.doubleValue() > returned));
+        currentCount = fetched.get();
+
+        // Perform the check again
+        paused = SCAN_START_DELAYED.doubleValue();
+        returned = SCAN_RETURNED_EARLY.doubleValue();
+        Thread.sleep(1500);
+        assertEquals(currentCount, fetched.get());
+
+        // Free the memory which will allow the pausing scanner to continue
+        freeServerMemory(client, table);
+
+        t.join();
+        assertEquals(30, fetched.get());
+      } finally {
+        to.delete(table);
+      }
+    }
+  }
+
+  @Test
+  public void testBatchScanReturnsEarlyDueToLowMemory() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
+
+      try (BatchScanner scanner = client.createBatchScanner(table,
+          client.securityOperations().getUserAuthorizations(client.whoami()), 
1)) {
+        Double returned = SCAN_RETURNED_EARLY.doubleValue();
+        Double paused = SCAN_START_DELAYED.doubleValue();
+
+        consumeServerMemory(scanner, table);
+
+        // Wait for longer than the memory check interval
+        Thread.sleep(6000);
+
+        // The metric that indicates a scan was returned early due to low 
memory should
+        // have been incremented.
+        assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned);
+        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
+        freeServerMemory(client, table);
+      } finally {
+        to.delete(table);
+      }
+    }
+  }
+
+  @Test
+  public void testBatchScanPauses() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
+
+      try (BatchScanner dataConsumingScanner = 
client.createBatchScanner(table);
+          Scanner memoryConsumingScanner = client.createScanner(table)) {
+
+        dataConsumingScanner.addScanIterator(
+            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", 
"500")));
+        dataConsumingScanner.setRanges(Collections.singletonList(new Range()));
+        Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
+        AtomicInteger fetched = new AtomicInteger(0);
+        Thread t = new Thread(() -> {
+          int i = 0;
+          while (iter.hasNext()) {
+            iter.next();
+            fetched.set(++i);
+          }
+        });
+
+        memoryConsumingScanner
+            .addScanIterator(new IteratorSetting(11, 
MemoryConsumingIterator.class, Map.of()));
+        memoryConsumingScanner.setBatchSize(1);
+        memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
+
+        t.start();
+
+        // Wait until the dataConsumingScanner has started fetching data
+        int currentCount = fetched.get();
+        while (currentCount == 0) {
+          Thread.sleep(500);
+          currentCount = fetched.get();
+        }
+
+        // This should block until the GarbageCollectionLogger runs and 
notices that the
+        // VM is low on memory.
+        Iterator<Entry<Key,Value>> consumingIter = 
memoryConsumingScanner.iterator();
+        assertTrue(consumingIter.hasNext());
+
+        // Confirm that some data was fetched by the dataConsumingScanner
+        currentCount = fetched.get();
+        assertTrue(currentCount > 0 && currentCount < 100);
+
+        // Grab the current paused count, wait two seconds and then confirm 
that
+        // the number of rows fetched by the memoryConsumingScanner has not 
increased
+        // and that the scan delay counter has increased.
+        Double returned = SCAN_RETURNED_EARLY.doubleValue();
+        Double paused = SCAN_START_DELAYED.doubleValue();
+        Thread.sleep(1500);
+        assertEquals(currentCount, fetched.get());
+        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
+        assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned);
+
+        // Perform the check again
+        paused = SCAN_START_DELAYED.doubleValue();
+        returned = SCAN_RETURNED_EARLY.doubleValue();
+        Thread.sleep(1500);
+        assertEquals(currentCount, fetched.get());
+        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
+        assertTrue(SCAN_RETURNED_EARLY.doubleValue() == returned);
+
+        // Free the memory which will allow the pausing scanner to continue
+        freeServerMemory(client, table);
+
+        t.join();
+        assertEquals(30, fetched.get());
+      } finally {
+        to.delete(table);
+      }
+    }
+  }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java 
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index 48a21f63cf..6aae998a11 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -90,7 +90,8 @@ public class MetricsIT extends ConfigurableMacBase implements 
MetricsProducer {
     cluster.stop();
 
     Set<String> unexpectedMetrics = Set.of(METRICS_SCAN_YIELDS, 
METRICS_UPDATE_ERRORS,
-        METRICS_COMPACTOR_MAJC_STUCK, METRICS_SCAN_BUSY_TIMEOUT);
+        METRICS_COMPACTOR_MAJC_STUCK, METRICS_SCAN_BUSY_TIMEOUT, 
METRICS_SCAN_PAUSED_FOR_MEM,
+        METRICS_SCAN_RETURN_FOR_MEM, METRICS_MINC_PAUSED, METRICS_MAJC_PAUSED);
     Set<String> flakyMetrics = Set.of(METRICS_GC_WAL_ERRORS, 
METRICS_FATE_TYPE_IN_PROGRESS,
         METRICS_PROPSTORE_EVICTION_COUNT, METRICS_PROPSTORE_REFRESH_COUNT,
         METRICS_PROPSTORE_REFRESH_LOAD_COUNT, 
METRICS_PROPSTORE_ZK_ERROR_COUNT);

Reply via email to