This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 01fd2ccb25 scan session id now available when session cleanup is 
deferred (#4902)
01fd2ccb25 is described below

commit 01fd2ccb25a3bc7d673308dddb8b88dd4c032b04
Author: Kevin Rathbun <krath...@apache.org>
AuthorDate: Thu Oct 3 09:51:59 2024 -0400

    scan session id now available when session cleanup is deferred (#4902)
    
    Previously, when a scan session was attempted to be cleaned up but couldn't 
and was deferred for later, the scan session id would still be visible from 
things like the `listscans` command but would have a value of -1. This change:
    - makes it so that the session id is still available in this situation
    - improves logging related to scan session ids and logs the scan session 
ids in more places
    - tests that invalid (0 = never set, -1 = no longer tracking) scan session 
ids are no longer seen
    closes #4842
---
 .../accumulo/core/clientImpl/ThriftScanner.java    |  5 +-
 .../org/apache/accumulo/tserver/ScanServer.java    | 19 ++++----
 .../accumulo/tserver/ThriftScanClientHandler.java  | 11 +++--
 .../apache/accumulo/tserver/session/Session.java   | 18 +++++++-
 .../accumulo/tserver/session/SessionManager.java   | 54 ++++++----------------
 .../org/apache/accumulo/test/ZombieScanIT.java     | 29 ++++++++++++
 6 files changed, 77 insertions(+), 59 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index e70b0637d8..4703ac8c08 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -389,8 +389,8 @@ public class ThriftScanner {
       // call the scan server selector and just go back to the previous scan 
server
       addr = scanState.prevLoc;
       log.trace(
-          "For tablet {} continuing scan on scan server {} without consulting 
scan server selector, using busyTimeout {}",
-          loc.getExtent(), addr.serverAddress, scanState.busyTimeout);
+          "For tablet {} continuing scan {} on scan server {} without 
consulting scan server selector, using busyTimeout {}",
+          loc.getExtent(), scanState.scanID, addr.serverAddress, 
scanState.busyTimeout);
     } else {
       var tabletId = new TabletIdImpl(loc.getExtent());
       // obtain a snapshot once and only expose this snapshot to the plugin 
for consistency
@@ -898,7 +898,6 @@ public class ThriftScanner {
         }
 
       } else {
-        // log.debug("Calling continue scan : "+scanState.range+" loc = "+loc);
         String msg =
             "Continuing scan tserver=" + addr.serverAddress + " scanid=" + 
scanState.scanID;
         Thread.currentThread().setName(msg);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 319241dc41..4dc21eb487 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -136,7 +136,7 @@ import com.google.common.net.HostAndPort;
 public class ScanServer extends AbstractServer
     implements TabletScanClientService.Iface, TabletHostingServer {
 
-  private static final Logger log = LoggerFactory.getLogger(ScanServer.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
 
   private static class TabletMetadataLoader implements 
CacheLoader<KeyExtent,TabletMetadata> {
 
@@ -173,8 +173,6 @@ public class ScanServer extends AbstractServer
     }
   }
 
-  private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
-
   protected ThriftScanClientHandler delegate;
   private UUID serverLockUUID;
   private final TabletMetadataLoader tabletMetadataLoader;
@@ -212,8 +210,8 @@ public class ScanServer extends AbstractServer
     super("sserver", opts, ServerContext::new, args);
 
     context = super.getContext();
-    log.info("Version " + Constants.VERSION);
-    log.info("Instance " + getContext().getInstanceID());
+    LOG.info("Version " + Constants.VERSION);
+    LOG.info("Instance " + getContext().getInstanceID());
     this.sessionManager = new SessionManager(context);
 
     this.resourceManager = new TabletServerResourceManager(context, this);
@@ -433,11 +431,11 @@ public class ScanServer extends AbstractServer
         // thread to look for log sorting work in the future
         logSorter.startWatchingForRecoveryLogs(threadPoolSize);
       } catch (Exception ex) {
-        log.error("Error starting LogSorter");
+        LOG.error("Error starting LogSorter");
         throw new RuntimeException(ex);
       }
     } else {
-      log.info(
+      LOG.info(
           "Log sorting for tablet recovery is disabled, 
SSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
     }
 
@@ -987,6 +985,7 @@ public class ScanServer extends AbstractServer
           batchTimeOut, classLoaderContext, executionHints, 
getScanTabletResolver(tablet),
           busyTimeout);
 
+      LOG.trace("started scan: {}", is.getScanID());
       return is;
     } catch (ScanServerBusyException be) {
       scanServerMetrics.incrementBusy();
@@ -1058,16 +1057,16 @@ public class ScanServer extends AbstractServer
           ssio, authorizations, waitForWrites, tSamplerConfig, batchTimeOut, 
contextArg,
           executionHints, getBatchScanTabletResolver(tablets), busyTimeout);
 
-      LOG.trace("started scan: {}", ims.getScanID());
+      LOG.trace("started multi scan: {}", ims.getScanID());
       return ims;
     } catch (ScanServerBusyException be) {
       scanServerMetrics.incrementBusy();
       throw be;
     } catch (TException e) {
-      LOG.error("Error starting scan", e);
+      LOG.error("Error starting multi scan", e);
       throw e;
     } catch (AccumuloException e) {
-      LOG.error("Error starting scan", e);
+      LOG.error("Error starting multi scan", e);
       throw new RuntimeException(e);
     }
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
index ee4cab0508..9344b1c4ce 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
@@ -336,8 +336,8 @@ public class ThriftScanClientHandler implements 
TabletScanClientService.Iface {
       long t2 = System.currentTimeMillis();
 
       if (log.isTraceEnabled()) {
-        log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f secs, 
nbTimes = [%s] ",
-            TServerUtils.clientAddress.get(), ss.extent.tableId(), 
ss.entriesReturned,
+        log.trace(String.format("ScanSess %d tid %s %s %,d entries in %.2f 
secs, nbTimes = [%s] ",
+            scanID, TServerUtils.clientAddress.get(), ss.extent.tableId(), 
ss.entriesReturned,
             (t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
       }
 
@@ -533,10 +533,11 @@ public class ThriftScanClientHandler implements 
TabletScanClientService.Iface {
 
     if (log.isTraceEnabled()) {
       log.trace(String.format(
-          "MultiScanSess %s %,d entries in %.2f secs"
+          "MultiScanSess %d %s %,d entries in %.2f secs"
               + " (lookup_time:%.2f secs tablets:%,d ranges:%,d) ",
-          TServerUtils.clientAddress.get(), session.numEntries, (t2 - 
session.startTime) / 1000.0,
-          session.totalLookupTime / 1000.0, session.numTablets, 
session.numRanges));
+          scanID, TServerUtils.clientAddress.get(), session.numEntries,
+          (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, 
session.numTablets,
+          session.numRanges));
     }
   }
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index 29247247c3..fda1395405 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -18,12 +18,15 @@
  */
 package org.apache.accumulo.tserver.session;
 
+import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.server.rpc.TServerUtils;
 
+import com.google.common.base.Preconditions;
+
 public class Session {
 
   enum State {
@@ -37,6 +40,7 @@ public class Session {
   boolean allowReservation = true;
   private final Timer stateChangeTimer = Timer.startNew();
   private final TCredentials credentials;
+  private OptionalLong sessionId = OptionalLong.empty();
 
   Session(TCredentials credentials) {
     this.credentials = credentials;
@@ -66,13 +70,23 @@ public class Session {
     return state;
   }
 
+  public void setSessionId(long sessionId) {
+    Preconditions.checkState(this.sessionId.isEmpty());
+    this.sessionId = OptionalLong.of(sessionId);
+  }
+
+  public long getSessionId() {
+    Preconditions.checkState(this.sessionId.isPresent());
+    return sessionId.orElseThrow();
+  }
+
   public long elaspedSinceStateChange(TimeUnit unit) {
     return stateChangeTimer.elapsed(unit);
   }
 
   @Override
   public String toString() {
-    return getClass().getSimpleName() + " " + state + " startTime:" + 
startTime + " lastAccessTime:"
-        + lastAccessTime + " client:" + client;
+    return getClass().getSimpleName() + " " + state + " sessionId:" + 
sessionId + " startTime:"
+        + startTime + " lastAccessTime:" + lastAccessTime + " client:" + 
client;
   }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 564f25c019..3a7df5347d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -24,14 +24,11 @@ import static 
org.apache.accumulo.core.util.LazySingletons.RANDOM;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -61,7 +58,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 
 public class SessionManager {
   private static final Logger log = 
LoggerFactory.getLogger(SessionManager.class);
@@ -70,7 +66,6 @@ public class SessionManager {
   private final long maxIdle;
   private final long maxUpdateIdle;
   private final BlockingQueue<Session> deferredCleanupQueue = new 
ArrayBlockingQueue<>(5000);
-  private final Long expiredSessionMarker = (long) -1;
   private final ServerContext ctx;
   private volatile LongConsumer zombieCountConsumer = null;
 
@@ -93,10 +88,10 @@ public class SessionManager {
       Preconditions.checkArgument(session.getState() == State.NEW);
       session.setState(reserve ? State.RESERVED : State.UNRESERVED);
       session.startTime = session.lastAccessTime = System.currentTimeMillis();
-    }
-
-    while (sessions.putIfAbsent(sid, session) != null) {
-      sid = RANDOM.get().nextLong();
+      while (sessions.putIfAbsent(sid, session) != null) {
+        sid = RANDOM.get().nextLong();
+      }
+      session.setSessionId(sid);
     }
 
     return sid;
@@ -312,8 +307,8 @@ public class SessionManager {
           }
           long idleTime = System.currentTimeMillis() - session.lastAccessTime;
           if (idleTime > configuredIdle) {
-            log.info("Closing idle session from user={}, client={}, 
idle={}ms", session.getUser(),
-                session.client, idleTime);
+            log.info("Closing idle session {} from user={}, client={}, 
idle={}ms",
+                session.getSessionId(), session.getUser(), session.client, 
idleTime);
             iter.remove();
             sessionsToCleanup.add(session);
             session.setState(State.REMOVED);
@@ -382,8 +377,9 @@ public class SessionManager {
             }
 
             if (shouldRemove) {
-              log.info("Closing not accessed session from user=" + 
session2.getUser() + ", client="
-                  + session2.client + ", duration=" + delay + "ms");
+              log.info("Closing not accessed session " + 
session2.getSessionId() + " from user="
+                  + session2.getUser() + ", client=" + session2.client + ", 
duration=" + delay
+                  + "ms");
               sessions.remove(sessionId);
               cleanup(session2);
             }
@@ -399,18 +395,7 @@ public class SessionManager {
   public Map<TableId,MapCounter<ScanRunState>> getActiveScansPerTable() {
     Map<TableId,MapCounter<ScanRunState>> counts = new HashMap<>();
 
-    Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
-
-    /*
-     * Add sessions so that get the list returned in the active scans call
-     */
-    for (Session session : deferredCleanupQueue) {
-      copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, 
session));
-    }
-
-    List.of(sessions.entrySet(), copiedIdleSessions).forEach(set -> 
set.forEach(entry -> {
-
-      Session session = entry.getValue();
+    Stream.concat(sessions.values().stream(), 
deferredCleanupQueue.stream()).forEach(session -> {
       ScanTask<?> nbt = null;
       TableId tableID = null;
 
@@ -430,7 +415,7 @@ public class SessionManager {
           counts.computeIfAbsent(tableID, unusedKey -> new 
MapCounter<>()).increment(srs, 1);
         }
       }
-    }));
+    });
 
     return counts;
   }
@@ -439,18 +424,8 @@ public class SessionManager {
 
     final List<ActiveScan> activeScans = new ArrayList<>();
     final long ct = System.currentTimeMillis();
-    final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
-
-    /*
-     * Add sessions that get the list returned in the active scans call
-     */
-    for (Session session : deferredCleanupQueue) {
-      copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, 
session));
-    }
-
-    List.of(sessions.entrySet(), copiedIdleSessions).forEach(s -> 
s.forEach(entry -> {
-      Session session = entry.getValue();
 
+    Stream.concat(sessions.values().stream(), 
deferredCleanupQueue.stream()).forEach(session -> {
       if (session instanceof ScanSession) {
         ScanSession<?> scanSession = (ScanSession<?>) session;
         boolean isSingle = session instanceof SingleScanSession;
@@ -459,9 +434,10 @@ public class SessionManager {
             isSingle ? ((SingleScanSession) scanSession).extent
                 : ((MultiScanSession) scanSession).threadPoolExtent,
             ct, isSingle ? ScanType.SINGLE : ScanType.BATCH,
-            computeScanState(scanSession.getScanTask()), 
scanSession.scanParams, entry.getKey());
+            computeScanState(scanSession.getScanTask()), 
scanSession.scanParams,
+            session.getSessionId());
       }
-    }));
+    });
 
     return activeScans;
   }
diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java 
b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
index 399a208d0a..60482e020c 100644
--- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
@@ -24,6 +24,7 @@ import static 
org.apache.accumulo.minicluster.ServerType.SCAN_SERVER;
 import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
 import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
@@ -41,9 +42,12 @@ import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.ScanType;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
@@ -240,6 +244,10 @@ public class ZombieScanIT extends ConfigurableMacBase {
       }
       assertEquals(4, tabletSeversWithZombieScans.size());
 
+      // This check may be outside the scope of this test but works nicely for 
this check and is
+      // simple enough to include
+      assertValidScanIds(c);
+
       executor.shutdownNow();
     }
 
@@ -400,4 +408,25 @@ public class ZombieScanIT extends ConfigurableMacBase {
         .filter(metric -> 
metric.getName().equals(SCAN_ZOMBIE_THREADS.getName()))
         .mapToInt(metric -> 
Integer.parseInt(metric.getValue())).max().orElse(-1);
   }
+
+  /**
+   * Ensure that the scan session ids are valid (should not expect 0 or -1). 0 
would mean the id was
+   * never set (previously existing bug). -1 previously indicated that the 
scan session was no
+   * longer tracking the id (occurred for scans when cleanup was attempted but 
deferred for later)
+   */
+  private void assertValidScanIds(AccumuloClient c)
+      throws AccumuloException, AccumuloSecurityException {
+    Set<Long> scanIds = new HashSet<>();
+    Set<ScanType> scanTypes = new HashSet<>();
+    for (String tserver : c.instanceOperations().getTabletServers()) {
+      c.instanceOperations().getActiveScans(tserver).forEach(activeScan -> {
+        scanIds.add(activeScan.getScanid());
+        scanTypes.add(activeScan.getType());
+      });
+    }
+    assertNotEquals(0, scanIds.size());
+    scanIds.forEach(id -> assertTrue(id != 0L && id != -1L));
+    // ensure coverage of both batch and single scans
+    assertEquals(scanTypes, Set.of(ScanType.SINGLE, ScanType.BATCH));
+  }
 }

Reply via email to