This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 6555fc12db avoid clearing server in cache when scanner closed and 
interrupted (#5313)
6555fc12db is described below

commit 6555fc12dbe753197555feacb6c701c4e7dd2ebe
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Feb 14 17:26:44 2025 -0500

    avoid clearing server in cache when scanner closed and interrupted (#5313)
    
    When the thread running an accumulo scanner is interrupted it may cause
    the server the scanner was reading from to be cleared from the cache.
    This can be disruptive in the case where the server is fine. This change
    makes a narrow exception to clearing the server from the cache for the
    case where the scanner was closed and an interrupt was seen.  The reason
    this is so narrow is to avoid failing to invalidate the cache in the
    case where there is actually a problem with the server.
---
 .../org/apache/accumulo/core/clientImpl/ScannerIterator.java |  2 ++
 .../org/apache/accumulo/core/clientImpl/ThriftScanner.java   | 12 ++++++++++--
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
index bf8f01687f..d1eca1b198 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
@@ -124,6 +124,8 @@ public class ScannerIterator implements 
Iterator<Entry<Key,Value>> {
   }
 
   void close() {
+    // setting this so that some errors can be ignored
+    scanState.closeInitiated = true;
     // run actual close operation in the background so this does not block.
     context.executeCleanupTask(() -> {
       synchronized (scanState) {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index 5894e3f2df..0447e5d423 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.security.SecureRandom;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -212,6 +213,8 @@ public class ThriftScanner {
 
     Duration busyTimeout;
 
+    volatile boolean closeInitiated = false;
+
     TabletLocation getErrorLocation() {
       return prevLoc;
     }
@@ -508,8 +511,13 @@ public class ThriftScanner {
           TraceUtil.setException(child2, e, false);
           sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
         } catch (TException e) {
-          TabletLocator.getLocator(context, 
scanState.tableId).invalidateCache(context,
-              loc.tablet_location);
+          boolean wasInterruptedAfterClose =
+              e.getCause() != null && 
e.getCause().getClass().equals(InterruptedIOException.class)
+                  && scanState.closeInitiated;
+          if (!wasInterruptedAfterClose) {
+            TabletLocator.getLocator(context, 
scanState.tableId).invalidateCache(context,
+                loc.tablet_location);
+          }
           error = "Scan failed, thrift error " + e.getClass().getName() + "  " 
+ e.getMessage()
               + " " + scanState.getErrorLocation();
           if (!error.equals(lastError)) {

Reply via email to