This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 88c2f7bbb6997f6bcc127f4391bb9f87c20d763b Merge: 3e85f58ab9 dcbc5d606a Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Aug 27 22:05:01 2024 +0000 Merge branch '2.1' into 3.1 .../TabletServerBatchReaderIterator.java | 20 +- .../accumulo/core/metrics/MetricsProducer.java | 8 + .../org/apache/accumulo/tserver/ScanServer.java | 1 + .../org/apache/accumulo/tserver/TabletServer.java | 1 + .../accumulo/tserver/ThriftScanClientHandler.java | 10 +- .../tserver/metrics/TabletServerScanMetrics.java | 13 +- .../apache/accumulo/tserver/scan/LookupTask.java | 2 +- .../accumulo/tserver/scan/NextBatchTask.java | 2 +- .../accumulo/tserver/scan/ScanParameters.java | 10 + .../org/apache/accumulo/tserver/scan/ScanTask.java | 93 ++++- .../accumulo/tserver/session/ScanSession.java | 69 +++- .../apache/accumulo/tserver/session/Session.java | 14 +- .../accumulo/tserver/session/SessionManager.java | 67 +++- .../accumulo/tserver/tablet/ScanDataSource.java | 4 + .../org/apache/accumulo/tserver/tablet/Tablet.java | 39 ++- .../tserver/session/SessionManagerTest.java | 42 +++ .../org/apache/accumulo/test/ZombieScanIT.java | 387 +++++++++++++++++++++ .../test/functional/ScanSessionTimeOutIT.java | 19 +- .../apache/accumulo/test/functional/ScannerIT.java | 81 +++++ 19 files changed, 855 insertions(+), 27 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index dadd9cbca6,3df2b10883..dd1e493e6c --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@@ -656,9 -681,7 +663,10 @@@ public interface MetricsProducer String METRICS_SCAN_QUERY_SCAN_RESULTS = METRICS_SCAN_PREFIX + "query.results"; String METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES = METRICS_SCAN_PREFIX + "query.results.bytes"; String METRICS_SCAN_SCANNED_ENTRIES = METRICS_SCAN_PREFIX + "query.scanned.entries"; + String METRICS_SCAN_PAUSED_FOR_MEM = METRICS_SCAN_PREFIX + "paused.for.memory"; + String METRICS_SCAN_RETURN_FOR_MEM = METRICS_SCAN_PREFIX + "return.early.for.memory"; + + String METRICS_SCAN_ZOMBIE_THREADS = METRICS_SCAN_PREFIX + "zombie.threads"; String METRICS_SCAN_TABLET_METADATA_CACHE = METRICS_SCAN_PREFIX + "tablet.metadata.cache"; String METRICS_TSERVER_PREFIX = "accumulo.tserver."; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index d1823f9348,29d392f1cc..41c6409063 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -709,9 -766,9 +709,10 @@@ public class TabletServer extends Abstr metrics = new TabletServerMetrics(this); updateMetrics = new TabletServerUpdateMetrics(); scanMetrics = new TabletServerScanMetrics(); + sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads); mincMetrics = new TabletServerMinCMetrics(); ceMetrics = new CompactionExecutorsMetrics(); + pausedMetrics = new PausedCompactionMetrics(); blockCacheMetrics = new BlockCacheMetrics(this.resourceManager.getIndexCache(), this.resourceManager.getDataCache(), this.resourceManager.getSummaryCache()); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java index ea246c1c72,a46a8bdeab..612045cb90 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java @@@ -39,13 -38,11 +39,13 @@@ public class TabletServerScanMetrics im private Timer scans = NoopMetrics.useNoopTimer(); private DistributionSummary resultsPerScan = NoopMetrics.useNoopDistributionSummary(); private DistributionSummary yields = NoopMetrics.useNoopDistributionSummary(); - private Counter startScanCalls = NoopMetrics.useNoopCounter(); - private Counter continueScanCalls = NoopMetrics.useNoopCounter();; - private Counter closeScanCalls = NoopMetrics.useNoopCounter();; - private Counter busyTimeoutCount = NoopMetrics.useNoopCounter();; + private final AtomicLong startScanCalls = new AtomicLong(0); + private final AtomicLong continueScanCalls = new AtomicLong(0); + private final AtomicLong closeScanCalls = new AtomicLong(0); + private final AtomicLong busyTimeoutCount = new AtomicLong(0); + private final AtomicLong pausedForMemory = new AtomicLong(0); + private final AtomicLong earlyReturnForMemory = new AtomicLong(0); - + private final AtomicLong zombieScanThreads = new AtomicLong(0); private final LongAdder lookupCount = new LongAdder(); private final LongAdder queryResultCount = new LongAdder(); private final LongAdder queryResultBytes = new LongAdder(); @@@ -87,30 -104,30 +87,38 @@@ openFiles.addAndGet(delta < 0 ? delta : delta * -1); } - public void incrementStartScan(double value) { - startScanCalls.increment(value); + public void incrementStartScan() { + startScanCalls.incrementAndGet(); } - public void incrementContinueScan(double value) { - continueScanCalls.increment(value); + public void incrementContinueScan() { + continueScanCalls.incrementAndGet(); } - public void incrementCloseScan(double value) { - closeScanCalls.increment(value); + public void incrementCloseScan() { + closeScanCalls.incrementAndGet(); } - public void incrementBusy(double value) { - busyTimeoutCount.increment(value); + public void incrementBusy() { + busyTimeoutCount.incrementAndGet(); + } + + public void incrementScanPausedForLowMemory() { + pausedForMemory.incrementAndGet(); + } + + public void incrementEarlyReturnForLowMemory() { + earlyReturnForMemory.incrementAndGet(); } + public void setZombieScanThreads(long count) { + zombieScanThreads.set(count); + } + + public long getZombieThreadsCount() { + return zombieScanThreads.get(); + } + @Override public void registerMetrics(MeterRegistry registry) { Gauge.builder(METRICS_SCAN_OPEN_FILES, openFiles::get) @@@ -120,28 -137,29 +128,31 @@@ .description("Results per scan").register(registry); yields = DistributionSummary.builder(METRICS_SCAN_YIELDS).description("yields").register(registry); - startScanCalls = Counter.builder(METRICS_SCAN_START) + FunctionCounter.builder(METRICS_SCAN_START, this.startScanCalls, AtomicLong::get) .description("calls to start a scan / multiscan").register(registry); - continueScanCalls = Counter.builder(METRICS_SCAN_CONTINUE) + FunctionCounter.builder(METRICS_SCAN_CONTINUE, this.continueScanCalls, AtomicLong::get) .description("calls to continue a scan / multiscan").register(registry); - closeScanCalls = Counter.builder(METRICS_SCAN_CLOSE) + FunctionCounter.builder(METRICS_SCAN_CLOSE, this.closeScanCalls, AtomicLong::get) .description("calls to close a scan / multiscan").register(registry); - busyTimeoutCount = Counter.builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER) + FunctionCounter + .builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER, this.busyTimeoutCount, AtomicLong::get) .description("The number of scans where a busy timeout happened").register(registry); - Gauge.builder(METRICS_SCAN_QUERIES, this, TabletServerScanMetrics::getLookupCount) + FunctionCounter.builder(METRICS_SCAN_QUERIES, this.lookupCount, LongAdder::sum) .description("Number of queries").register(registry); - Gauge - .builder(METRICS_SCAN_QUERY_SCAN_RESULTS, this, - TabletServerScanMetrics::getQueryResultCount) + FunctionCounter.builder(METRICS_SCAN_SCANNED_ENTRIES, this.scannedCount, LongAdder::sum) + .description("Scanned rate").register(registry); + FunctionCounter.builder(METRICS_SCAN_PAUSED_FOR_MEM, this.pausedForMemory, AtomicLong::get) + .description("scan paused due to server being low on memory").register(registry); + FunctionCounter.builder(METRICS_SCAN_RETURN_FOR_MEM, this.earlyReturnForMemory, AtomicLong::get) + .description("scan returned results early due to server being low on memory") + .register(registry); + Gauge.builder(METRICS_SCAN_QUERY_SCAN_RESULTS, this.queryResultCount, LongAdder::sum) .description("Query rate (entries/sec)").register(registry); - Gauge - .builder(METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES, this, - TabletServerScanMetrics::getQueryByteCount) + Gauge.builder(METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES, this.queryResultBytes, LongAdder::sum) .description("Query rate (bytes/sec)").register(registry); - Gauge.builder(METRICS_SCAN_SCANNED_ENTRIES, this, TabletServerScanMetrics::getScannedCount) - .description("Scanned rate").register(registry); + Gauge.builder(METRICS_SCAN_ZOMBIE_THREADS, this, TabletServerScanMetrics::getZombieThreadsCount) + .description("Number of scan threads that have no associated client session") + .register(registry); } } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index d24146f983,d32dbcd14c..dc95bde89b --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@@ -36,7 -38,10 +37,9 @@@ import java.util.concurrent.BlockingQue import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; + import java.util.function.LongConsumer; import java.util.stream.Collectors; + import java.util.stream.Stream; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@@ -68,7 -74,9 +71,8 @@@ public class SessionManager private final long maxUpdateIdle; private final BlockingQueue<Session> deferredCleanupQueue = new ArrayBlockingQueue<>(5000); private final Long expiredSessionMarker = (long) -1; - private final AccumuloConfiguration aconf; private final ServerContext ctx; + private volatile LongConsumer zombieCountConsumer = null; public SessionManager(ServerContext context) { this.ctx = context; @@@ -231,19 -242,50 +236,46 @@@ if (session.getState() == State.RESERVED) { return false; } - session.setState(State.REMOVED); - removed = true; } - if (removed) { - sessions.remove(sessionId); - } + sessions.remove(sessionId); - return removed; + return true; } + /** + * Prevents a session from ever being reserved in the future. This method can be called + * concurrently when another thread has the session reserved w/o impacting the other thread. When + * the session is currently reserved by another thread that thread can unreserve as normal and + * after that this session can never be reserved again. Since the session can never be reserved + * after this call it will eventually age off and be cleaned up. + * + * @return true if the sessions is currently not reserved, false otherwise + */ + public boolean disallowNewReservations(long sessionId) { + var session = getSession(sessionId); + if (session == null) { + return true; + } + synchronized (session) { + if (session.allowReservation) { + // Prevent future reservations of this session. + session.allowReservation = false; + log.debug("disabled session {}", sessionId); + } + + // If nothing can reserve the session and it is not currently reserved then the session is + // disabled and will eventually be cleaned up. + return session.getState() != State.RESERVED; + } + } + static void cleanup(BlockingQueue<Session> deferredCleanupQueue, Session session) { if (!session.cleanup()) { - var retry = Retry.builder().infiniteRetries().retryAfter(25, MILLISECONDS) - .incrementBy(25, MILLISECONDS).maxWait(5, SECONDS).backOffFactor(1.5) - .logInterval(1, MINUTES).createRetry(); + var retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25)) + .incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(5)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(1)).createRetry(); while (!deferredCleanupQueue.offer(session)) { if (session.cleanup()) { diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java index 511ac427a3,741a02a18a..79cdbbe279 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java @@@ -18,7 -18,9 +18,9 @@@ */ package org.apache.accumulo.test.functional; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static java.util.concurrent.TimeUnit.SECONDS; + import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans; + import static org.junit.jupiter.api.Assertions.assertEquals; import java.time.Duration; import java.util.Iterator; @@@ -117,12 -120,25 +119,25 @@@ public class ScanSessionTimeOutIT exten Iterator<Entry<Key,Value>> iter = scanner.iterator(); verify(iter, 0, 200); + // There should be a scan session open since not all data was read from the iterator + assertEquals(1L, countActiveScans(c, tableName)); // sleep three times the session timeout - sleepUninterruptibly(9, TimeUnit.SECONDS); + Thread.sleep(SECONDS.toMillis(9)); - - verify(iter, 200, 100000); + // The scan session should have timed out and the next read should create a new one + assertEquals(0L, countActiveScans(c, tableName)); + + verify(iter, 200, 50000); + // Reading part of the data in the range should cause a new scan session to be created + assertEquals(1L, countActiveScans(c, tableName)); + verify(iter, 50000, 100000); + // Once all of the data in the range was read the scanner should automatically close the + // scan session + assertEquals(0L, countActiveScans(c, tableName)); } + + // Nothing should have created any ew scan sessions for the table + assertEquals(0L, countActiveScans(c, tableName)); } } diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java index 697452ccbd,57c511f6ae..0a479e2df4 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java @@@ -34,7 -37,9 +37,8 @@@ import org.apache.accumulo.core.data.Mu import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterHarness; + import org.apache.accumulo.test.util.Wait; import org.junit.jupiter.api.Test; public class ScannerIT extends AccumuloClusterHarness {