This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 01fd2ccb25 scan session id now available when session cleanup is deferred (#4902) 01fd2ccb25 is described below commit 01fd2ccb25a3bc7d673308dddb8b88dd4c032b04 Author: Kevin Rathbun <krath...@apache.org> AuthorDate: Thu Oct 3 09:51:59 2024 -0400 scan session id now available when session cleanup is deferred (#4902) Previously, when a scan session was attempted to be cleaned up but couldn't and was deferred for later, the scan session id would still be visible from things like the `listscans` command but would have a value of -1. This change: - makes it so that the session id is still available in this situation - improves logging related to scan session ids and logs the scan session ids in more places - tests that invalid (0 = never set, -1 = no longer tracking) scan session ids are no longer seen closes #4842 --- .../accumulo/core/clientImpl/ThriftScanner.java | 5 +- .../org/apache/accumulo/tserver/ScanServer.java | 19 ++++---- .../accumulo/tserver/ThriftScanClientHandler.java | 11 +++-- .../apache/accumulo/tserver/session/Session.java | 18 +++++++- .../accumulo/tserver/session/SessionManager.java | 54 ++++++---------------- .../org/apache/accumulo/test/ZombieScanIT.java | 29 ++++++++++++ 6 files changed, 77 insertions(+), 59 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index e70b0637d8..4703ac8c08 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@ -389,8 +389,8 @@ public class ThriftScanner { // call the scan server selector and just go back to the previous scan server addr = scanState.prevLoc; log.trace( - "For tablet {} continuing scan on scan server {} without consulting scan server selector, using busyTimeout {}", - loc.getExtent(), addr.serverAddress, scanState.busyTimeout); + "For tablet {} continuing scan {} on scan server {} without consulting scan server selector, using busyTimeout {}", + loc.getExtent(), scanState.scanID, addr.serverAddress, scanState.busyTimeout); } else { var tabletId = new TabletIdImpl(loc.getExtent()); // obtain a snapshot once and only expose this snapshot to the plugin for consistency @@ -898,7 +898,6 @@ public class ThriftScanner { } } else { - // log.debug("Calling continue scan : "+scanState.range+" loc = "+loc); String msg = "Continuing scan tserver=" + addr.serverAddress + " scanid=" + scanState.scanID; Thread.currentThread().setName(msg); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 319241dc41..4dc21eb487 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -136,7 +136,7 @@ import com.google.common.net.HostAndPort; public class ScanServer extends AbstractServer implements TabletScanClientService.Iface, TabletHostingServer { - private static final Logger log = LoggerFactory.getLogger(ScanServer.class); + private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class); private static class TabletMetadataLoader implements CacheLoader<KeyExtent,TabletMetadata> { @@ -173,8 +173,6 @@ public class ScanServer extends AbstractServer } } - private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class); - protected ThriftScanClientHandler delegate; private UUID serverLockUUID; private final TabletMetadataLoader tabletMetadataLoader; @@ -212,8 +210,8 @@ public class ScanServer extends AbstractServer super("sserver", opts, ServerContext::new, args); context = super.getContext(); - log.info("Version " + Constants.VERSION); - log.info("Instance " + getContext().getInstanceID()); + LOG.info("Version " + Constants.VERSION); + LOG.info("Instance " + getContext().getInstanceID()); this.sessionManager = new SessionManager(context); this.resourceManager = new TabletServerResourceManager(context, this); @@ -433,11 +431,11 @@ public class ScanServer extends AbstractServer // thread to look for log sorting work in the future logSorter.startWatchingForRecoveryLogs(threadPoolSize); } catch (Exception ex) { - log.error("Error starting LogSorter"); + LOG.error("Error starting LogSorter"); throw new RuntimeException(ex); } } else { - log.info( + LOG.info( "Log sorting for tablet recovery is disabled, SSERV_WAL_SORT_MAX_CONCURRENT is less than 1."); } @@ -987,6 +985,7 @@ public class ScanServer extends AbstractServer batchTimeOut, classLoaderContext, executionHints, getScanTabletResolver(tablet), busyTimeout); + LOG.trace("started scan: {}", is.getScanID()); return is; } catch (ScanServerBusyException be) { scanServerMetrics.incrementBusy(); @@ -1058,16 +1057,16 @@ public class ScanServer extends AbstractServer ssio, authorizations, waitForWrites, tSamplerConfig, batchTimeOut, contextArg, executionHints, getBatchScanTabletResolver(tablets), busyTimeout); - LOG.trace("started scan: {}", ims.getScanID()); + LOG.trace("started multi scan: {}", ims.getScanID()); return ims; } catch (ScanServerBusyException be) { scanServerMetrics.incrementBusy(); throw be; } catch (TException e) { - LOG.error("Error starting scan", e); + LOG.error("Error starting multi scan", e); throw e; } catch (AccumuloException e) { - LOG.error("Error starting scan", e); + LOG.error("Error starting multi scan", e); throw new RuntimeException(e); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java index ee4cab0508..9344b1c4ce 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java @@ -336,8 +336,8 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { long t2 = System.currentTimeMillis(); if (log.isTraceEnabled()) { - log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", - TServerUtils.clientAddress.get(), ss.extent.tableId(), ss.entriesReturned, + log.trace(String.format("ScanSess %d tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", + scanID, TServerUtils.clientAddress.get(), ss.extent.tableId(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.runStats.toString())); } @@ -533,10 +533,11 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { if (log.isTraceEnabled()) { log.trace(String.format( - "MultiScanSess %s %,d entries in %.2f secs" + "MultiScanSess %d %s %,d entries in %.2f secs" + " (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", - TServerUtils.clientAddress.get(), session.numEntries, (t2 - session.startTime) / 1000.0, - session.totalLookupTime / 1000.0, session.numTablets, session.numRanges)); + scanID, TServerUtils.clientAddress.get(), session.numEntries, + (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, + session.numRanges)); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java index 29247247c3..fda1395405 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java @@ -18,12 +18,15 @@ */ package org.apache.accumulo.tserver.session; +import java.util.OptionalLong; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.server.rpc.TServerUtils; +import com.google.common.base.Preconditions; + public class Session { enum State { @@ -37,6 +40,7 @@ public class Session { boolean allowReservation = true; private final Timer stateChangeTimer = Timer.startNew(); private final TCredentials credentials; + private OptionalLong sessionId = OptionalLong.empty(); Session(TCredentials credentials) { this.credentials = credentials; @@ -66,13 +70,23 @@ public class Session { return state; } + public void setSessionId(long sessionId) { + Preconditions.checkState(this.sessionId.isEmpty()); + this.sessionId = OptionalLong.of(sessionId); + } + + public long getSessionId() { + Preconditions.checkState(this.sessionId.isPresent()); + return sessionId.orElseThrow(); + } + public long elaspedSinceStateChange(TimeUnit unit) { return stateChangeTimer.elapsed(unit); } @Override public String toString() { - return getClass().getSimpleName() + " " + state + " startTime:" + startTime + " lastAccessTime:" - + lastAccessTime + " client:" + client; + return getClass().getSimpleName() + " " + state + " sessionId:" + sessionId + " startTime:" + + startTime + " lastAccessTime:" + lastAccessTime + " client:" + client; } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index 564f25c019..3a7df5347d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -24,14 +24,11 @@ import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -61,7 +58,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; public class SessionManager { private static final Logger log = LoggerFactory.getLogger(SessionManager.class); @@ -70,7 +66,6 @@ public class SessionManager { private final long maxIdle; private final long maxUpdateIdle; private final BlockingQueue<Session> deferredCleanupQueue = new ArrayBlockingQueue<>(5000); - private final Long expiredSessionMarker = (long) -1; private final ServerContext ctx; private volatile LongConsumer zombieCountConsumer = null; @@ -93,10 +88,10 @@ public class SessionManager { Preconditions.checkArgument(session.getState() == State.NEW); session.setState(reserve ? State.RESERVED : State.UNRESERVED); session.startTime = session.lastAccessTime = System.currentTimeMillis(); - } - - while (sessions.putIfAbsent(sid, session) != null) { - sid = RANDOM.get().nextLong(); + while (sessions.putIfAbsent(sid, session) != null) { + sid = RANDOM.get().nextLong(); + } + session.setSessionId(sid); } return sid; @@ -312,8 +307,8 @@ public class SessionManager { } long idleTime = System.currentTimeMillis() - session.lastAccessTime; if (idleTime > configuredIdle) { - log.info("Closing idle session from user={}, client={}, idle={}ms", session.getUser(), - session.client, idleTime); + log.info("Closing idle session {} from user={}, client={}, idle={}ms", + session.getSessionId(), session.getUser(), session.client, idleTime); iter.remove(); sessionsToCleanup.add(session); session.setState(State.REMOVED); @@ -382,8 +377,9 @@ public class SessionManager { } if (shouldRemove) { - log.info("Closing not accessed session from user=" + session2.getUser() + ", client=" - + session2.client + ", duration=" + delay + "ms"); + log.info("Closing not accessed session " + session2.getSessionId() + " from user=" + + session2.getUser() + ", client=" + session2.client + ", duration=" + delay + + "ms"); sessions.remove(sessionId); cleanup(session2); } @@ -399,18 +395,7 @@ public class SessionManager { public Map<TableId,MapCounter<ScanRunState>> getActiveScansPerTable() { Map<TableId,MapCounter<ScanRunState>> counts = new HashMap<>(); - Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>(); - - /* - * Add sessions so that get the list returned in the active scans call - */ - for (Session session : deferredCleanupQueue) { - copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); - } - - List.of(sessions.entrySet(), copiedIdleSessions).forEach(set -> set.forEach(entry -> { - - Session session = entry.getValue(); + Stream.concat(sessions.values().stream(), deferredCleanupQueue.stream()).forEach(session -> { ScanTask<?> nbt = null; TableId tableID = null; @@ -430,7 +415,7 @@ public class SessionManager { counts.computeIfAbsent(tableID, unusedKey -> new MapCounter<>()).increment(srs, 1); } } - })); + }); return counts; } @@ -439,18 +424,8 @@ public class SessionManager { final List<ActiveScan> activeScans = new ArrayList<>(); final long ct = System.currentTimeMillis(); - final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>(); - - /* - * Add sessions that get the list returned in the active scans call - */ - for (Session session : deferredCleanupQueue) { - copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); - } - - List.of(sessions.entrySet(), copiedIdleSessions).forEach(s -> s.forEach(entry -> { - Session session = entry.getValue(); + Stream.concat(sessions.values().stream(), deferredCleanupQueue.stream()).forEach(session -> { if (session instanceof ScanSession) { ScanSession<?> scanSession = (ScanSession<?>) session; boolean isSingle = session instanceof SingleScanSession; @@ -459,9 +434,10 @@ public class SessionManager { isSingle ? ((SingleScanSession) scanSession).extent : ((MultiScanSession) scanSession).threadPoolExtent, ct, isSingle ? ScanType.SINGLE : ScanType.BATCH, - computeScanState(scanSession.getScanTask()), scanSession.scanParams, entry.getKey()); + computeScanState(scanSession.getScanTask()), scanSession.scanParams, + session.getSessionId()); } - })); + }); return activeScans; } diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java index 399a208d0a..60482e020c 100644 --- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java @@ -24,6 +24,7 @@ import static org.apache.accumulo.minicluster.ServerType.SCAN_SERVER; import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; @@ -41,9 +42,12 @@ import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.ScanType; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; @@ -240,6 +244,10 @@ public class ZombieScanIT extends ConfigurableMacBase { } assertEquals(4, tabletSeversWithZombieScans.size()); + // This check may be outside the scope of this test but works nicely for this check and is + // simple enough to include + assertValidScanIds(c); + executor.shutdownNow(); } @@ -400,4 +408,25 @@ public class ZombieScanIT extends ConfigurableMacBase { .filter(metric -> metric.getName().equals(SCAN_ZOMBIE_THREADS.getName())) .mapToInt(metric -> Integer.parseInt(metric.getValue())).max().orElse(-1); } + + /** + * Ensure that the scan session ids are valid (should not expect 0 or -1). 0 would mean the id was + * never set (previously existing bug). -1 previously indicated that the scan session was no + * longer tracking the id (occurred for scans when cleanup was attempted but deferred for later) + */ + private void assertValidScanIds(AccumuloClient c) + throws AccumuloException, AccumuloSecurityException { + Set<Long> scanIds = new HashSet<>(); + Set<ScanType> scanTypes = new HashSet<>(); + for (String tserver : c.instanceOperations().getTabletServers()) { + c.instanceOperations().getActiveScans(tserver).forEach(activeScan -> { + scanIds.add(activeScan.getScanid()); + scanTypes.add(activeScan.getType()); + }); + } + assertNotEquals(0, scanIds.size()); + scanIds.forEach(id -> assertTrue(id != 0L && id != -1L)); + // ensure coverage of both batch and single scans + assertEquals(scanTypes, Set.of(ScanType.SINGLE, ScanType.BATCH)); + } }