This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 053acd43ce adds metric to count zombie scans (#4840) 053acd43ce is described below commit 053acd43ce1d04afdd012ccb73d057f51adfc580 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Aug 27 13:43:01 2024 -0700 adds metric to count zombie scans (#4840) Changes in this commit : * Scan sessions with an associated running thread will continue to show up in listscans. Previously they would stop showing up after an attempt was made to interrupt the thread. * When a scan session is removed and it has an associated runing thread then it will be interrupted. This is in addition to setting the interrupt atomic boolean. Previously only the atomic boolean was set. * A new metric will count removed scan session that still have a running thread. Scan session are removed when the client is no longer interested in the result. The new metric name is `accumulo.scan.zombie.threads`. * A new test that checks the new metric is working, listscans shows zombie scans, and that stuck threads are interrupted. * Scan sessions can have an associated scans task. Scan task can have an associated thread. Added checks to ensure the scan task for scan session is not cleared if it has a thread. This helps ensure the zombie thread detection works. Also it is a good general improvement as clearing the task while it has an active thread could result in silent data loss. Does not seem like the current code would ever violate this check. --- .../accumulo/core/metrics/MetricsProducer.java | 9 +- .../org/apache/accumulo/tserver/ScanServer.java | 1 + .../org/apache/accumulo/tserver/TabletServer.java | 1 + .../accumulo/tserver/ThriftScanClientHandler.java | 6 +- .../tserver/metrics/TabletServerScanMetrics.java | 14 +- .../apache/accumulo/tserver/scan/LookupTask.java | 2 +- .../accumulo/tserver/scan/NextBatchTask.java | 2 +- .../org/apache/accumulo/tserver/scan/ScanTask.java | 93 +++++++- .../accumulo/tserver/session/ScanSession.java | 69 +++++- .../apache/accumulo/tserver/session/Session.java | 13 +- .../accumulo/tserver/session/SessionManager.java | 31 +++ .../org/apache/accumulo/test/ZombieScanIT.java | 255 +++++++++++++++++++++ 12 files changed, 478 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index e82d7ec690..3df2b10883 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@ -463,6 +463,13 @@ import io.micrometer.core.instrument.MeterRegistry; * <td>Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be * derived</td> * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@value #METRICS_SCAN_ZOMBIE_THREADS}</td> + * <td>Gauge</td> + * <td></td> + * </tr> * <!-- major compactions --> * <tr> * <td>{i|e}_{compactionServiceName}_{executor_name}_queued</td> @@ -674,7 +681,7 @@ public interface MetricsProducer { String METRICS_SCAN_QUERY_SCAN_RESULTS = METRICS_SCAN_PREFIX + "query.results"; String METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES = METRICS_SCAN_PREFIX + "query.results.bytes"; String METRICS_SCAN_SCANNED_ENTRIES = METRICS_SCAN_PREFIX + "query.scanned.entries"; - + String METRICS_SCAN_ZOMBIE_THREADS = METRICS_SCAN_PREFIX + "zombie.threads"; String METRICS_SCAN_TABLET_METADATA_CACHE = METRICS_SCAN_PREFIX + "tablet.metadata.cache"; String METRICS_TSERVER_PREFIX = "accumulo.tserver."; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index a348f25951..0e74b8c732 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -408,6 +408,7 @@ public class ScanServer extends AbstractServer metricsInfo.addCommonTags(List.of(Tag.of("resource.group", groupName))); scanMetrics = new TabletServerScanMetrics(); + sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads); scanServerMetrics = new ScanServerMetrics(tabletMetadataCache); blockCacheMetrics = new BlockCacheMetrics(resourceManager.getIndexCache(), resourceManager.getDataCache(), resourceManager.getSummaryCache()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 87c3a00277..29d392f1cc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -766,6 +766,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer metrics = new TabletServerMetrics(this); updateMetrics = new TabletServerUpdateMetrics(); scanMetrics = new TabletServerScanMetrics(); + sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads); mincMetrics = new TabletServerMinCMetrics(); ceMetrics = new CompactionExecutorsMetrics(); blockCacheMetrics = new BlockCacheMetrics(this.resourceManager.getIndexCache(), diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java index 666076caeb..6bacdcab0c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java @@ -262,7 +262,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { try { bresult = scanSession.getScanTask().get(busyTimeout, MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); - scanSession.setScanTask(null); + scanSession.clearScanTask(); } catch (ExecutionException e) { server.getSessionManager().removeSession(scanID); if (e.getCause() instanceof NotServingTabletException) { @@ -276,7 +276,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); List<KVEntry> empty = Collections.emptyList(); bresult = new ScanBatch(empty, true); - scanSession.setScanTask(null); + scanSession.clearScanTask(); } else { throw new RuntimeException(e); } @@ -482,7 +482,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { MultiScanResult scanResult = session.getScanTask().get(busyTimeout, MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); - session.setScanTask(null); + session.clearScanTask(); return scanResult; } catch (ExecutionException e) { server.getSessionManager().removeSession(scanID); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java index c2c8dd0d32..a46a8bdeab 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java @@ -20,6 +20,7 @@ package org.apache.accumulo.tserver.metrics; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import org.apache.accumulo.core.metrics.MetricsProducer; @@ -41,7 +42,7 @@ public class TabletServerScanMetrics implements MetricsProducer { private Counter continueScanCalls = NoopMetrics.useNoopCounter();; private Counter closeScanCalls = NoopMetrics.useNoopCounter();; private Counter busyTimeoutCount = NoopMetrics.useNoopCounter();; - + private final AtomicLong zombieScanThreads = new AtomicLong(0); private final LongAdder lookupCount = new LongAdder(); private final LongAdder queryResultCount = new LongAdder(); private final LongAdder queryResultBytes = new LongAdder(); @@ -119,6 +120,14 @@ public class TabletServerScanMetrics implements MetricsProducer { busyTimeoutCount.increment(value); } + public void setZombieScanThreads(long count) { + zombieScanThreads.set(count); + } + + public long getZombieThreadsCount() { + return zombieScanThreads.get(); + } + @Override public void registerMetrics(MeterRegistry registry) { Gauge.builder(METRICS_SCAN_OPEN_FILES, openFiles::get) @@ -148,6 +157,9 @@ public class TabletServerScanMetrics implements MetricsProducer { .description("Query rate (bytes/sec)").register(registry); Gauge.builder(METRICS_SCAN_SCANNED_ENTRIES, this, TabletServerScanMetrics::getScannedCount) .description("Scanned rate").register(registry); + Gauge.builder(METRICS_SCAN_ZOMBIE_THREADS, this, TabletServerScanMetrics::getZombieThreadsCount) + .description("Number of scan threads that have no associated client session") + .register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java index fcd159e0ca..351946bb8c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java @@ -183,8 +183,8 @@ public class LookupTask extends ScanTask<MultiScanResult> { log.warn("exception while doing multi-scan ", e); addResult(e); } finally { + transitionFromRunning(); Thread.currentThread().setName(oldThreadName); - runState.set(ScanRunState.FINISHED); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java index de26f163ae..9d601f4342 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java @@ -98,7 +98,7 @@ public class NextBatchTask extends ScanTask<ScanBatch> { e); addResult(e); } finally { - runState.set(ScanRunState.FINISHED); + transitionFromRunning(); Thread.currentThread().setName(oldThreadName); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java index 6adbda60fd..96a33f50b1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java @@ -26,16 +26,23 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.accumulo.tserver.TabletHostingServer; +import com.google.common.base.Preconditions; + public abstract class ScanTask<T> implements Runnable { protected final TabletHostingServer server; protected AtomicBoolean interruptFlag; protected ArrayBlockingQueue<Object> resultQueue; protected AtomicInteger state; - protected AtomicReference<ScanRunState> runState; + private AtomicReference<ScanRunState> runState; + + private Thread scanThread = null; + private final Lock scanThreadLock = new ReentrantLock(); private static final int INITIAL = 1; private static final int ADDED = 2; @@ -50,7 +57,58 @@ public abstract class ScanTask<T> implements Runnable { } protected boolean transitionToRunning() { - return runState.compareAndSet(ScanRunState.QUEUED, ScanRunState.RUNNING); + if (runState.compareAndSet(ScanRunState.QUEUED, ScanRunState.RUNNING)) { + scanThreadLock.lock(); + try { + Preconditions.checkState(scanThread == null); + scanThread = Thread.currentThread(); + } finally { + scanThreadLock.unlock(); + } + return true; + } else { + return false; + } + } + + protected void transitionFromRunning() { + scanThreadLock.lock(); + try { + Preconditions.checkState(scanThread != null); + scanThread = null; + } finally { + scanThreadLock.unlock(); + } + runState.compareAndSet(ScanRunState.RUNNING, ScanRunState.FINISHED); + } + + public static class ScanThreadStackTrace { + public final long threadId; + public final String threadName; + public final StackTraceElement[] stackTrace; + + private ScanThreadStackTrace(Thread thread) { + this.threadId = thread.getId(); + this.stackTrace = thread.getStackTrace(); + this.threadName = thread.getName(); + } + } + + public ScanThreadStackTrace getStackTrace() { + // Acquire the scanThreadLock to ensure we only get the stack trace when the thread is executing + // the scan task for this code. The threads could be thread pool threads and if they exit the + // task they could move on to process an unrelated scan task. Should not get unrelated stack + // traces when using the lock. + scanThreadLock.lock(); + try { + if (scanThread == null) { + return null; + } + + return new ScanThreadStackTrace(scanThread); + } finally { + scanThreadLock.unlock(); + } } protected void addResult(Object o) { @@ -67,16 +125,26 @@ public abstract class ScanTask<T> implements Runnable { "Cancel will always attempt to interrupt running next batch task"); } - if (state.get() == CANCELED) { - return true; - } - if (state.compareAndSet(INITIAL, CANCELED)) { interruptFlag.set(true); resultQueue = null; return true; } + if (state.get() == CANCELED) { + scanThreadLock.lock(); + try { + if (scanThread != null) { + // Doing the interrupt while the scanThreadLock is held prevents race conditions where we + // interrupt a thread pool thread that has moved onto another unrelated task. + scanThread.interrupt(); + } + } finally { + scanThreadLock.unlock(); + } + return true; + } + return false; } @@ -172,8 +240,21 @@ public abstract class ScanTask<T> implements Runnable { return state.get() == CANCELED; } + public boolean producedResult() { + return state.get() == ADDED; + } + public ScanRunState getScanRunState() { return runState.get(); } + public Thread getScanThread() { + scanThreadLock.lock(); + try { + return scanThread; + } finally { + scanThreadLock.unlock(); + } + } + } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java index 32e6c8bcaf..557fd4eb3b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java @@ -21,8 +21,11 @@ package org.apache.accumulo.tserver.session; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.accumulo.core.data.Column; @@ -36,11 +39,15 @@ import org.apache.accumulo.core.util.Stat; import org.apache.accumulo.tserver.scan.ScanParameters; import org.apache.accumulo.tserver.scan.ScanTask; import org.apache.accumulo.tserver.tablet.TabletBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; public abstract class ScanSession<T> extends Session implements ScanInfo { + private static final Logger log = LoggerFactory.getLogger(ScanSession.class); + public interface TabletResolver { TabletBase getTablet(KeyExtent extent); @@ -82,7 +89,7 @@ public abstract class ScanSession<T> extends Session implements ScanInfo { private final Map<String,String> executionHints; private final TabletResolver tabletResolver; - private volatile ScanTask<T> scanTask; + private final AtomicReference<ScanTask<T>> scanTaskRef = new AtomicReference<>(); ScanSession(TCredentials credentials, ScanParameters scanParams, Map<String,String> executionHints, TabletResolver tabletResolver) { @@ -184,17 +191,71 @@ public abstract class ScanSession<T> extends Session implements ScanInfo { } public ScanTask<T> getScanTask() { - return scanTask; + return scanTaskRef.get(); } public void setScanTask(ScanTask<T> scanTask) { - this.scanTask = scanTask; + Objects.requireNonNull(scanTask); + scanTaskRef.getAndUpdate(currScanTask -> { + Preconditions.checkState(currScanTask == null, + "Unable to set a scan task when one is already set"); + return scanTask; + }); + } + + public void clearScanTask() { + scanTaskRef.getAndUpdate(currScanTask -> { + // For tracking zombie scan threads, do not want to clear the scan task if it has an active + // thread. When the thread is not null and the task has produced a result, the thread should + // be in + // the process of clearing itself from the scan task. + Preconditions.checkState( + currScanTask == null || currScanTask.getScanThread() == null + || currScanTask.producedResult(), + "Can not clear scan task that is still running and has not produced a result"); + return null; + }); + } + + private boolean loggedZombieStackTrace = false; + + public void logZombieStackTrace() { + Preconditions.checkState(getState() == State.REMOVED); + var scanTask = scanTaskRef.get(); + if (scanTask != null) { + ScanTask.ScanThreadStackTrace scanStackTrace = scanTask.getStackTrace(); + if (scanStackTrace != null && !loggedZombieStackTrace) { + var changeTimeMillis = elaspedSinceStateChange(TimeUnit.MILLISECONDS); + var exception = + new Exception("Fake exception to capture stack trace of zombie scan. Thread id:" + + scanStackTrace.threadId + " thread name:" + scanStackTrace.threadName); + exception.setStackTrace(scanStackTrace.stackTrace); + log.warn( + "Scan session with no client active for {}ms has a zombie scan thread. Scan session info : {} ", + changeTimeMillis, this, exception); + loggedZombieStackTrace = true; + } + } + } @Override public boolean cleanup() { tabletResolver.close(); - return super.cleanup(); + + if (!super.cleanup()) { + return false; + } + + var scanTask = scanTaskRef.get(); + if (scanTask != null && scanTask.getScanThread() != null) { + // Leave the session around if there is still a scan thread associated with it. This will + // cause it to still show up in listscans and it will cause it to show up in the count of + // zombie scans. + return false; + } + + return true; } @Override diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java index 65df78a345..3f170331c8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java @@ -18,7 +18,10 @@ */ package org.apache.accumulo.tserver.session; +import java.util.concurrent.TimeUnit; + import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.server.rpc.TServerUtils; public class Session { @@ -31,6 +34,7 @@ public class Session { public long lastAccessTime; public long startTime; private State state = State.NEW; + private final Timer stateChangeTimer = Timer.startNew(); private final TCredentials credentials; Session(TCredentials credentials) { @@ -51,13 +55,20 @@ public class Session { } public void setState(State state) { - this.state = state; + if (this.state != state) { + this.state = state; + stateChangeTimer.restart(); + } } public State getState() { return state; } + public long elaspedSinceStateChange(TimeUnit unit) { + return stateChangeTimer.elapsed(unit); + } + @Override public String toString() { return getClass().getSimpleName() + " " + state + " startTime:" + startTime + " lastAccessTime:" diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index 7217ce9015..76ecdb62ba 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -38,7 +39,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -73,6 +76,7 @@ public class SessionManager { private final Long expiredSessionMarker = (long) -1; private final AccumuloConfiguration aconf; private final ServerContext ctx; + private volatile LongConsumer zombieCountConsumer = null; public SessionManager(ServerContext context) { this.ctx = context; @@ -308,6 +312,33 @@ public class SessionManager { sessionsToCleanup.removeIf(Session::cleanup); sessionsToCleanup.forEach(this::cleanup); + + if (zombieCountConsumer != null) { + zombieCountConsumer.accept(countZombieScans(maxIdle)); + } + } + + private long countZombieScans(long reportTimeMillis) { + return Stream.concat(deferredCleanupQueue.stream(), sessions.values().stream()) + .filter(session -> { + if (session instanceof ScanSession) { + var scanSession = (ScanSession) session; + synchronized (scanSession) { + var scanTask = scanSession.getScanTask(); + if (scanTask != null && scanSession.getState() == State.REMOVED + && scanTask.getScanThread() != null + && scanSession.elaspedSinceStateChange(MILLISECONDS) > reportTimeMillis) { + scanSession.logZombieStackTrace(); + return true; + } + } + } + return false; + }).count(); + } + + public void setZombieCountConsumer(LongConsumer zombieCountConsumer) { + this.zombieCountConsumer = Objects.requireNonNull(zombieCountConsumer); } public void removeIfNotAccessed(final long sessionId, final long delay) { diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java new file mode 100644 index 0000000000..25f5ab3195 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; +import org.apache.accumulo.test.metrics.TestStatsDSink; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class ZombieScanIT extends ConfigurableMacBase { + + private static TestStatsDSink sink; + + @BeforeAll + public static void before() throws Exception { + sink = new TestStatsDSink(); + } + + @AfterAll + public static void after() throws Exception { + sink.close(); + } + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + super.configure(cfg, hadoopCoreSite); + + // Make sessions time out much more quickly. This will cause a session to be classified as a + // zombie scan much sooner. + cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "6s"); + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED, "true"); + cfg.setProperty("general.custom.metrics.opts.logging.step", "1s"); + String clazzList = LoggingMeterRegistryFactory.class.getName() + "," + + TestStatsDRegistryFactory.class.getName(); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList); + Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); + cfg.setNumTservers(1); + } + + /** + * An iterator that should get stuck forever when used + */ + public static class ZombieIterator extends WrappingIterator { + @Override + public boolean hasTop() { + // must call super.hasTop() before blocking as that will run accumulo code to setup iterator + boolean ht = super.hasTop(); + Semaphore semaphore = new Semaphore(10); + semaphore.acquireUninterruptibly(5); + // this should block forever + semaphore.acquireUninterruptibly(6); + return ht; + } + } + + /** + * An iterator that should get stuck but can be interrupted + */ + public static class StuckIterator extends WrappingIterator { + @Override + public boolean hasTop() { + try { + // must call super.hasTop() before blocking as that will run accumulo code to setup iterator + boolean ht = super.hasTop(); + Semaphore semaphore = new Semaphore(10); + semaphore.acquire(5); + // this should block forever + semaphore.acquire(6); + return ht; + } catch (InterruptedException ie) { + throw new IllegalStateException(ie); + } + } + } + + /** + * Create some zombie scans and ensure metrics for them show up. + */ + @Test + public void testMetrics() throws Exception { + + Wait.waitFor(() -> { + var zsmc = getZombieScansMetric(); + return zsmc == -1 || zsmc == 0; + }); + + String table = getUniqueNames(1)[0]; + + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + + c.tableOperations().create(table); + + var executor = Executors.newCachedThreadPool(); + + // start four stuck scans that should never return data + List<Future<String>> futures = new ArrayList<>(); + for (var row : List.of("2", "4")) { + // start a scan with an iterator that gets stuck and can not be interrupted + futures.add(startStuckScan(c, table, executor, row, false)); + // start a scan with an iterator that gets stuck and can be interrupted + futures.add(startStuckScan(c, table, executor, row, true)); + } + + // start four stuck scans, using a batch scanner, that should never return data + for (var row : List.of("6", "8")) { + // start a scan with an iterator that gets stuck and can not be interrupted + futures.add(startStuckBatchScan(c, table, executor, row, false)); + // start a scan with an iterator that gets stuck and can be interrupted + futures.add(startStuckBatchScan(c, table, executor, row, true)); + } + + // should eventually see the eight stuck scans running + Wait.waitFor(() -> countScansForTable(table, c) == 8); + + // Cancel the scan threads. This will cause the sessions on the server side to timeout and + // become inactive. The stuck threads on the server side related to the timed out sessions + // will be interrupted. + Wait.waitFor(() -> { + futures.forEach(future -> future.cancel(true)); + return futures.stream().allMatch(Future::isDone); + }); + + // Four of the eight running scans should respond to thread interrupts and exit + Wait.waitFor(() -> countScansForTable(table, c) == 4); + + Wait.waitFor(() -> getZombieScansMetric() == 4); + + assertEquals(4, countScansForTable(table, c)); + + // start four more stuck scans with two that will ignore interrupts + futures.clear(); + futures.add(startStuckScan(c, table, executor, "0", false)); + futures.add(startStuckScan(c, table, executor, "0", true)); + futures.add(startStuckBatchScan(c, table, executor, "99", false)); + futures.add(startStuckBatchScan(c, table, executor, "0", true)); + + Wait.waitFor(() -> countScansForTable(table, c) == 8); + + // Cancel the client side scan threads. Should cause the server side threads to be + // interrupted. + Wait.waitFor(() -> { + futures.forEach(future -> future.cancel(true)); + return futures.stream().allMatch(Future::isDone); + }); + + // Two of the stuck threads should respond to interrupts on the server side and exit. + Wait.waitFor(() -> countScansForTable(table, c) == 6); + + Wait.waitFor(() -> getZombieScansMetric() == 6); + + assertEquals(6, countScansForTable(table, c)); + + executor.shutdownNow(); + } + + } + + private Future<String> startStuckScan(AccumuloClient c, String table, ExecutorService executor, + String row, boolean canInterrupt) { + return executor.submit(() -> { + try (var scanner = c.createScanner(table)) { + String className; + if (canInterrupt) { + className = StuckIterator.class.getName(); + } else { + className = ZombieIterator.class.getName(); + } + IteratorSetting iter = new IteratorSetting(100, "Z", className); + scanner.addScanIterator(iter); + scanner.setRange(new Range(row)); + return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) + .orElse("none"); + } + }); + } + + private Future<String> startStuckBatchScan(AccumuloClient c, String table, + ExecutorService executor, String row, boolean canInterrupt) { + return executor.submit(() -> { + try (var scanner = c.createBatchScanner(table)) { + String className; + if (canInterrupt) { + className = StuckIterator.class.getName(); + } else { + className = ZombieIterator.class.getName(); + } + + IteratorSetting iter = new IteratorSetting(100, "Z", className); + scanner.addScanIterator(iter); + scanner.setRanges(List.of(new Range(row))); + return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) + .orElse("none"); + } + }); + } + + private int getZombieScansMetric() { + return sink.getLines().stream().map(TestStatsDSink::parseStatsDMetric) + .filter(metric -> metric.getName().equals(MetricsProducer.METRICS_SCAN_ZOMBIE_THREADS)) + .mapToInt(metric -> Integer.parseInt(metric.getValue())).max().orElse(-1); + } + + private static long countScansForTable(String table, AccumuloClient client) throws Exception { + var tservers = client.instanceOperations().getTabletServers(); + long count = 0; + for (String tserver : tservers) { + count += client.instanceOperations().getActiveScans(tserver).stream() + .filter(activeScan -> activeScan.getTable().equals(table)).count(); + } + return count; + } + +}