This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 88c2f7bbb6997f6bcc127f4391bb9f87c20d763b
Merge: 3e85f58ab9 dcbc5d606a
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Aug 27 22:05:01 2024 +0000

    Merge branch '2.1' into 3.1

 .../TabletServerBatchReaderIterator.java           |  20 +-
 .../accumulo/core/metrics/MetricsProducer.java     |   8 +
 .../org/apache/accumulo/tserver/ScanServer.java    |   1 +
 .../org/apache/accumulo/tserver/TabletServer.java  |   1 +
 .../accumulo/tserver/ThriftScanClientHandler.java  |  10 +-
 .../tserver/metrics/TabletServerScanMetrics.java   |  13 +-
 .../apache/accumulo/tserver/scan/LookupTask.java   |   2 +-
 .../accumulo/tserver/scan/NextBatchTask.java       |   2 +-
 .../accumulo/tserver/scan/ScanParameters.java      |  10 +
 .../org/apache/accumulo/tserver/scan/ScanTask.java |  93 ++++-
 .../accumulo/tserver/session/ScanSession.java      |  69 +++-
 .../apache/accumulo/tserver/session/Session.java   |  14 +-
 .../accumulo/tserver/session/SessionManager.java   |  67 +++-
 .../accumulo/tserver/tablet/ScanDataSource.java    |   4 +
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  39 ++-
 .../tserver/session/SessionManagerTest.java        |  42 +++
 .../org/apache/accumulo/test/ZombieScanIT.java     | 387 +++++++++++++++++++++
 .../test/functional/ScanSessionTimeOutIT.java      |  19 +-
 .../apache/accumulo/test/functional/ScannerIT.java |  81 +++++
 19 files changed, 855 insertions(+), 27 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index dadd9cbca6,3df2b10883..dd1e493e6c
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@@ -656,9 -681,7 +663,10 @@@ public interface MetricsProducer 
    String METRICS_SCAN_QUERY_SCAN_RESULTS = METRICS_SCAN_PREFIX + 
"query.results";
    String METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES = METRICS_SCAN_PREFIX + 
"query.results.bytes";
    String METRICS_SCAN_SCANNED_ENTRIES = METRICS_SCAN_PREFIX + 
"query.scanned.entries";
 +  String METRICS_SCAN_PAUSED_FOR_MEM = METRICS_SCAN_PREFIX + 
"paused.for.memory";
 +  String METRICS_SCAN_RETURN_FOR_MEM = METRICS_SCAN_PREFIX + 
"return.early.for.memory";
 +
+   String METRICS_SCAN_ZOMBIE_THREADS = METRICS_SCAN_PREFIX + "zombie.threads";
    String METRICS_SCAN_TABLET_METADATA_CACHE = METRICS_SCAN_PREFIX + 
"tablet.metadata.cache";
  
    String METRICS_TSERVER_PREFIX = "accumulo.tserver.";
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index d1823f9348,29d392f1cc..41c6409063
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -709,9 -766,9 +709,10 @@@ public class TabletServer extends Abstr
      metrics = new TabletServerMetrics(this);
      updateMetrics = new TabletServerUpdateMetrics();
      scanMetrics = new TabletServerScanMetrics();
+     sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads);
      mincMetrics = new TabletServerMinCMetrics();
      ceMetrics = new CompactionExecutorsMetrics();
 +    pausedMetrics = new PausedCompactionMetrics();
      blockCacheMetrics = new 
BlockCacheMetrics(this.resourceManager.getIndexCache(),
          this.resourceManager.getDataCache(), 
this.resourceManager.getSummaryCache());
  
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
index ea246c1c72,a46a8bdeab..612045cb90
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
@@@ -39,13 -38,11 +39,13 @@@ public class TabletServerScanMetrics im
    private Timer scans = NoopMetrics.useNoopTimer();
    private DistributionSummary resultsPerScan = 
NoopMetrics.useNoopDistributionSummary();
    private DistributionSummary yields = 
NoopMetrics.useNoopDistributionSummary();
 -  private Counter startScanCalls = NoopMetrics.useNoopCounter();
 -  private Counter continueScanCalls = NoopMetrics.useNoopCounter();;
 -  private Counter closeScanCalls = NoopMetrics.useNoopCounter();;
 -  private Counter busyTimeoutCount = NoopMetrics.useNoopCounter();;
 +  private final AtomicLong startScanCalls = new AtomicLong(0);
 +  private final AtomicLong continueScanCalls = new AtomicLong(0);
 +  private final AtomicLong closeScanCalls = new AtomicLong(0);
 +  private final AtomicLong busyTimeoutCount = new AtomicLong(0);
 +  private final AtomicLong pausedForMemory = new AtomicLong(0);
 +  private final AtomicLong earlyReturnForMemory = new AtomicLong(0);
- 
+   private final AtomicLong zombieScanThreads = new AtomicLong(0);
    private final LongAdder lookupCount = new LongAdder();
    private final LongAdder queryResultCount = new LongAdder();
    private final LongAdder queryResultBytes = new LongAdder();
@@@ -87,30 -104,30 +87,38 @@@
      openFiles.addAndGet(delta < 0 ? delta : delta * -1);
    }
  
 -  public void incrementStartScan(double value) {
 -    startScanCalls.increment(value);
 +  public void incrementStartScan() {
 +    startScanCalls.incrementAndGet();
    }
  
 -  public void incrementContinueScan(double value) {
 -    continueScanCalls.increment(value);
 +  public void incrementContinueScan() {
 +    continueScanCalls.incrementAndGet();
    }
  
 -  public void incrementCloseScan(double value) {
 -    closeScanCalls.increment(value);
 +  public void incrementCloseScan() {
 +    closeScanCalls.incrementAndGet();
    }
  
 -  public void incrementBusy(double value) {
 -    busyTimeoutCount.increment(value);
 +  public void incrementBusy() {
 +    busyTimeoutCount.incrementAndGet();
 +  }
 +
 +  public void incrementScanPausedForLowMemory() {
 +    pausedForMemory.incrementAndGet();
 +  }
 +
 +  public void incrementEarlyReturnForLowMemory() {
 +    earlyReturnForMemory.incrementAndGet();
    }
  
+   public void setZombieScanThreads(long count) {
+     zombieScanThreads.set(count);
+   }
+ 
+   public long getZombieThreadsCount() {
+     return zombieScanThreads.get();
+   }
+ 
    @Override
    public void registerMetrics(MeterRegistry registry) {
      Gauge.builder(METRICS_SCAN_OPEN_FILES, openFiles::get)
@@@ -120,28 -137,29 +128,31 @@@
          .description("Results per scan").register(registry);
      yields =
          
DistributionSummary.builder(METRICS_SCAN_YIELDS).description("yields").register(registry);
 -    startScanCalls = Counter.builder(METRICS_SCAN_START)
 +    FunctionCounter.builder(METRICS_SCAN_START, this.startScanCalls, 
AtomicLong::get)
          .description("calls to start a scan / multiscan").register(registry);
 -    continueScanCalls = Counter.builder(METRICS_SCAN_CONTINUE)
 +    FunctionCounter.builder(METRICS_SCAN_CONTINUE, this.continueScanCalls, 
AtomicLong::get)
          .description("calls to continue a scan / 
multiscan").register(registry);
 -    closeScanCalls = Counter.builder(METRICS_SCAN_CLOSE)
 +    FunctionCounter.builder(METRICS_SCAN_CLOSE, this.closeScanCalls, 
AtomicLong::get)
          .description("calls to close a scan / multiscan").register(registry);
 -    busyTimeoutCount = Counter.builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER)
 +    FunctionCounter
 +        .builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER, this.busyTimeoutCount, 
AtomicLong::get)
          .description("The number of scans where a busy timeout 
happened").register(registry);
 -    Gauge.builder(METRICS_SCAN_QUERIES, this, 
TabletServerScanMetrics::getLookupCount)
 +    FunctionCounter.builder(METRICS_SCAN_QUERIES, this.lookupCount, 
LongAdder::sum)
          .description("Number of queries").register(registry);
 -    Gauge
 -        .builder(METRICS_SCAN_QUERY_SCAN_RESULTS, this,
 -            TabletServerScanMetrics::getQueryResultCount)
 +    FunctionCounter.builder(METRICS_SCAN_SCANNED_ENTRIES, this.scannedCount, 
LongAdder::sum)
 +        .description("Scanned rate").register(registry);
 +    FunctionCounter.builder(METRICS_SCAN_PAUSED_FOR_MEM, 
this.pausedForMemory, AtomicLong::get)
 +        .description("scan paused due to server being low on 
memory").register(registry);
 +    FunctionCounter.builder(METRICS_SCAN_RETURN_FOR_MEM, 
this.earlyReturnForMemory, AtomicLong::get)
 +        .description("scan returned results early due to server being low on 
memory")
 +        .register(registry);
 +    Gauge.builder(METRICS_SCAN_QUERY_SCAN_RESULTS, this.queryResultCount, 
LongAdder::sum)
          .description("Query rate (entries/sec)").register(registry);
 -    Gauge
 -        .builder(METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES, this,
 -            TabletServerScanMetrics::getQueryByteCount)
 +    Gauge.builder(METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES, 
this.queryResultBytes, LongAdder::sum)
          .description("Query rate (bytes/sec)").register(registry);
 -    Gauge.builder(METRICS_SCAN_SCANNED_ENTRIES, this, 
TabletServerScanMetrics::getScannedCount)
 -        .description("Scanned rate").register(registry);
+     Gauge.builder(METRICS_SCAN_ZOMBIE_THREADS, this, 
TabletServerScanMetrics::getZombieThreadsCount)
+         .description("Number of scan threads that have no associated client 
session")
+         .register(registry);
    }
  
  }
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index d24146f983,d32dbcd14c..dc95bde89b
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@@ -36,7 -38,10 +37,9 @@@ import java.util.concurrent.BlockingQue
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.ConcurrentMap;
  import java.util.concurrent.ScheduledFuture;
 -import java.util.concurrent.TimeUnit;
+ import java.util.function.LongConsumer;
  import java.util.stream.Collectors;
+ import java.util.stream.Stream;
  
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.Property;
@@@ -68,7 -74,9 +71,8 @@@ public class SessionManager 
    private final long maxUpdateIdle;
    private final BlockingQueue<Session> deferredCleanupQueue = new 
ArrayBlockingQueue<>(5000);
    private final Long expiredSessionMarker = (long) -1;
 -  private final AccumuloConfiguration aconf;
    private final ServerContext ctx;
+   private volatile LongConsumer zombieCountConsumer = null;
  
    public SessionManager(ServerContext context) {
      this.ctx = context;
@@@ -231,19 -242,50 +236,46 @@@
        if (session.getState() == State.RESERVED) {
          return false;
        }
 -
        session.setState(State.REMOVED);
 -      removed = true;
      }
  
 -    if (removed) {
 -      sessions.remove(sessionId);
 -    }
 +    sessions.remove(sessionId);
  
 -    return removed;
 +    return true;
    }
  
+   /**
+    * Prevents a session from ever being reserved in the future. This method 
can be called
+    * concurrently when another thread has the session reserved w/o impacting 
the other thread. When
+    * the session is currently reserved by another thread that thread can 
unreserve as normal and
+    * after that this session can never be reserved again. Since the session 
can never be reserved
+    * after this call it will eventually age off and be cleaned up.
+    *
+    * @return true if the sessions is currently not reserved, false otherwise
+    */
+   public boolean disallowNewReservations(long sessionId) {
+     var session = getSession(sessionId);
+     if (session == null) {
+       return true;
+     }
+     synchronized (session) {
+       if (session.allowReservation) {
+         // Prevent future reservations of this session.
+         session.allowReservation = false;
+         log.debug("disabled session {}", sessionId);
+       }
+ 
+       // If nothing can reserve the session and it is not currently reserved 
then the session is
+       // disabled and will eventually be cleaned up.
+       return session.getState() != State.RESERVED;
+     }
+   }
+ 
    static void cleanup(BlockingQueue<Session> deferredCleanupQueue, Session 
session) {
      if (!session.cleanup()) {
 -      var retry = Retry.builder().infiniteRetries().retryAfter(25, 
MILLISECONDS)
 -          .incrementBy(25, MILLISECONDS).maxWait(5, 
SECONDS).backOffFactor(1.5)
 -          .logInterval(1, MINUTES).createRetry();
 +      var retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25))
 +          
.incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(5)).backOffFactor(1.5)
 +          .logInterval(Duration.ofMinutes(1)).createRetry();
  
        while (!deferredCleanupQueue.offer(session)) {
          if (session.cleanup()) {
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
index 511ac427a3,741a02a18a..79cdbbe279
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
@@@ -18,7 -18,9 +18,9 @@@
   */
  package org.apache.accumulo.test.functional;
  
 -import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 +import static java.util.concurrent.TimeUnit.SECONDS;
+ import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans;
+ import static org.junit.jupiter.api.Assertions.assertEquals;
  
  import java.time.Duration;
  import java.util.Iterator;
@@@ -117,12 -120,25 +119,25 @@@ public class ScanSessionTimeOutIT exten
          Iterator<Entry<Key,Value>> iter = scanner.iterator();
  
          verify(iter, 0, 200);
+         // There should be a scan session open since not all data was read 
from the iterator
+         assertEquals(1L, countActiveScans(c, tableName));
  
          // sleep three times the session timeout
 -        sleepUninterruptibly(9, TimeUnit.SECONDS);
 +        Thread.sleep(SECONDS.toMillis(9));
- 
-         verify(iter, 200, 100000);
+         // The scan session should have timed out and the next read should 
create a new one
+         assertEquals(0L, countActiveScans(c, tableName));
+ 
+         verify(iter, 200, 50000);
+         // Reading part of the data in the range should cause a new scan 
session to be created
+         assertEquals(1L, countActiveScans(c, tableName));
+         verify(iter, 50000, 100000);
+         // Once all of the data in the range was read the scanner should 
automatically close the
+         // scan session
+         assertEquals(0L, countActiveScans(c, tableName));
        }
+ 
+       // Nothing should have created any ew scan sessions for the table
+       assertEquals(0L, countActiveScans(c, tableName));
      }
    }
  
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index 697452ccbd,57c511f6ae..0a479e2df4
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@@ -34,7 -37,9 +37,8 @@@ import org.apache.accumulo.core.data.Mu
  import org.apache.accumulo.core.data.Range;
  import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.security.Authorizations;
 -import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.accumulo.harness.AccumuloClusterHarness;
+ import org.apache.accumulo.test.util.Wait;
  import org.junit.jupiter.api.Test;
  
  public class ScannerIT extends AccumuloClusterHarness {

Reply via email to