This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new f65e15d954 Improves concurrency of client side tablet location cache 
(#5423)
f65e15d954 is described below

commit f65e15d954669fe3419ec403a4659cd44e926478
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Apr 4 10:09:45 2025 -0400

    Improves concurrency of client side tablet location cache (#5423)
    
    The tablet location cache before these changes used a read/write lock
    and had two problems.
    
     1. Only one concurrent metadata tablet lookup could happen at a time
        per table to find a tablets location.
     2. When a metadata lookup was happening for a table it would block all
        reads for the cache, even if the needed location was already in the
        cache.
    
    This commit makes the following changes.
    
     1. Removes the read/write lock
     2. Uses a concurrent skip list for the cache instead of treemap to
        support concurrent reads and writes to the cahces data structure.
     3. Add per metadata tablet locking for metadata table reads.
     4. Removes deferred invalidation as a lot of this was related to the
        previous locking structure.  Now when something is invalidated, the
        concurrent skip list is immediately updated w/o any locking.
    
    The changes offer the following new behavior for the cache.
    
     1. If the location needed by a client thread is already in the cache,
        then it will never block now even if another thread is doing a
        metadata table lookup.
     2. The cache will allow a concurrent metadata lookup for each metadata
        tablet a table has data in.  For example if a user table has 10,000
        tablets with tablet metadata stored in 20 metadata tablets, then the
        cache will now allow up to 20 concurrent metadata lookups (one per
        metadata tablet).
     3. If a client threads finds a tablet location in the cache is no longer
        valid when it invalidates it will not cause any other threads to
        block unless they need that specific tablet location.
    
    
    Co-authored-by: Christopher Tubbs <ctubb...@apache.org>
---
 .../core/clientImpl/ClientTabletCacheImpl.java     | 408 +++++++++------------
 .../metadata/MetadataCachedTabletObtainer.java     |  68 +---
 .../org/apache/accumulo/core/util/LockMap.java     | 114 ++++++
 .../core/clientImpl/ClientTabletCacheImplTest.java | 346 ++++++++++++++---
 .../org/apache/accumulo/core/util/LockMapTest.java | 135 +++++++
 5 files changed, 705 insertions(+), 366 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
index b1f0f913d3..f382be10cf 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
@@ -31,13 +31,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NavigableMap;
 import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -57,6 +55,7 @@ import 
org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.LockMap;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.Timer;
@@ -68,8 +67,49 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
+/**
+ * This class has two concurrency goals. First when a thread request data that 
is currently present
+ * in the cache, it should never block. Second when a thread request data that 
is not in the cache
+ * only one lookup per metadata tablet will happen concurrently. The purpose 
of this second goal is
+ * to avoid redundant concurrent metadata lookups in a client process.
+ *
+ * <p>
+ * The first goal is achieved by using a ConcurrentSkipListMap to store the 
caches data making it
+ * safe for multiple threads to read and write to the map. The second goal is 
achieved by using a
+ * {@link LockMap} keyed on metadata table extents when doing metadata table 
lookups.
+ *
+ * <p>
+ * Below is an example of how this cache is intended to work.
+ *
+ * <ol>
+ * <li>Thread_1 lookups up row A that is not currently present in the cache.
+ * <li>Thread_2 lookups up row C that is not currently present in the cache.
+ * <li>Thread_3 lookups up row Q that is not currently present in the cache.
+ * <li>Thread_1 finds metadata tablet MT1 stores information on row A and 
locks the extent for MT1.
+ * <li>Thread_2 finds metadata tablet MT1 stores information on row C and 
locks the extent for MT1.
+ * <li>Thread_3 finds metadata tablet MT2 stores information on row Q and 
locks the extent for MT2.
+ * <li>Thread_1 acquires the lock for MT1
+ * <li>Thread_2 blocks waiting to lock MT1
+ * <li>Thread_3 acquires the lock for MT2
+ * <li>Thread_4 finds row Z in the cache and immediately returns its user 
tablet information. If
+ * this data was not cached, it would have needed to read metadata tablet MT2 
which is currently
+ * locked and would have blocked.
+ * <li>Thread_1 reads user_tablet_1_metadata that contains row A from MT1 and 
adds it to the cache.
+ * It also opportunistically reads a few more user tablets metadata from MT1 
after the first user
+ * tablet adds them the cache.
+ * <li>Thread_3 reads user_tablet_10_metadata that contains row Q from MT2 and 
adds it to the cache.
+ * <li>Thread_1 finds user_tablet_1_metadata in the cache and returns it as 
the tablet for row A.
+ * <li>Thread_1 unlocks the lock for MT1
+ * <li>Thread_3 finds user_tablet_10_metadata in the cache and returns it as 
the tablet for row Q.
+ * <li>Thread_3 unlocks the lock for MT2
+ * <li>Thread_2 acquires the lock for MT1
+ * <li>Thread_2 checks the cache and finds the information it needs is now 
present in the cache
+ * because it was found by Thread_1. No metadata lookup is done, the 
information from the cache is
+ * returned.
+ * <li>Thread_2 unlocks the lock for MT1
+ * </ol>
+ *
+ */
 public class ClientTabletCacheImpl extends ClientTabletCache {
 
   private static final Logger log = 
LoggerFactory.getLogger(ClientTabletCacheImpl.class);
@@ -95,26 +135,21 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
 
   protected final TableId tableId;
   protected final ClientTabletCache parent;
-  protected final TreeMap<Text,CachedTablet> metaCache = new 
TreeMap<>(END_ROW_COMPARATOR);
+  protected final ConcurrentSkipListMap<Text,CachedTablet> metaCache =
+      new ConcurrentSkipListMap<>(END_ROW_COMPARATOR);
   protected final CachedTabletObtainer tabletObtainer;
   private final TabletServerLockChecker lockChecker;
   protected final Text lastTabletRow;
 
-  private final TreeSet<KeyExtent> badExtents = new TreeSet<>();
-  private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
-  private final Lock rLock = rwLock.readLock();
-  private final Lock wLock = rwLock.writeLock();
   private final AtomicLong tabletHostingRequestCount = new AtomicLong(0);
 
+  private final LockMap<KeyExtent> lookupLocks = new LockMap<>();
+
   public interface CachedTabletObtainer {
     /**
      * @return null when unable to read information successfully
      */
-    CachedTablets lookupTablet(ClientContext context, CachedTablet src, Text 
row, Text stopRow,
-        ClientTabletCache parent) throws AccumuloSecurityException, 
AccumuloException;
-
-    List<CachedTablet> lookupTablets(ClientContext context, String tserver,
-        Map<KeyExtent,List<Range>> map, ClientTabletCache parent)
+    CachedTablets lookupTablet(ClientContext context, CachedTablet src, Text 
row, Text stopRow)
         throws AccumuloSecurityException, AccumuloException;
   }
 
@@ -203,27 +238,20 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
 
     LockCheckerSession lcSession = new LockCheckerSession();
 
-    rLock.lock();
-    try {
-      processInvalidated(context, lcSession);
-
-      // for this to be efficient rows need to be in sorted order, but always 
sorting is slow...
-      // therefore only sort the
-      // stuff not in the cache.... it is most efficient to pass _locateTablet 
rows in sorted order
+    // for this to be efficient rows need to be in sorted order, but always 
sorting is slow...
+    // therefore only sort the
+    // stuff not in the cache.... it is most efficient to pass _locateTablet 
rows in sorted order
 
-      // For this to be efficient, need to avoid fine grained synchronization 
and fine grained
-      // logging.
-      // Therefore methods called by this are not synchronized and should not 
log.
+    // For this to be efficient, need to avoid fine grained synchronization 
and fine grained
+    // logging.
+    // Therefore methods called by this are not synchronized and should not 
log.
 
-      for (T mutation : mutations) {
-        row.set(mutation.getRow());
-        CachedTablet tl = findTabletInCache(row);
-        if (!addMutation(binnedMutations, mutation, tl, lcSession)) {
-          notInCache.add(mutation);
-        }
+    for (T mutation : mutations) {
+      row.set(mutation.getRow());
+      CachedTablet tl = findTabletInCache(row);
+      if (!addMutation(binnedMutations, mutation, tl, lcSession)) {
+        notInCache.add(mutation);
       }
-    } finally {
-      rLock.unlock();
     }
 
     HashSet<CachedTablet> locationLess = new HashSet<>();
@@ -232,29 +260,24 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
       notInCache.sort((o1, o2) -> WritableComparator.compareBytes(o1.getRow(), 
0,
           o1.getRow().length, o2.getRow(), 0, o2.getRow().length));
 
-      wLock.lock();
-      try {
-        // Want to ignore any entries in the cache w/o a location that were 
created before the
-        // following time. Entries created after the following time may have 
been populated by the
-        // following loop, and we want to use those.
-        Timer cacheCutoffTimer = Timer.startNew();
+      // Want to ignore any entries in the cache w/o a location that were 
created before the
+      // following time. Entries created after the following time may have 
been populated by the
+      // following loop, and we want to use those.
+      Timer cacheCutoffTimer = Timer.startNew();
 
-        for (T mutation : notInCache) {
+      for (T mutation : notInCache) {
 
-          row.set(mutation.getRow());
+        row.set(mutation.getRow());
 
-          CachedTablet tl = _findTablet(context, row, false, false, false, 
lcSession,
-              LocationNeed.REQUIRED, cacheCutoffTimer);
+        CachedTablet tl =
+            _findTablet(context, row, false, lcSession, LocationNeed.REQUIRED, 
cacheCutoffTimer);
 
-          if (!addMutation(binnedMutations, mutation, tl, lcSession)) {
-            failures.add(mutation);
-            if (tl != null && tl.getTserverLocation().isEmpty()) {
-              locationLess.add(tl);
-            }
+        if (!addMutation(binnedMutations, mutation, tl, lcSession)) {
+          failures.add(mutation);
+          if (tl != null && tl.getTserverLocation().isEmpty()) {
+            locationLess.add(tl);
           }
         }
-      } finally {
-        wLock.unlock();
       }
     }
 
@@ -346,8 +369,7 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
       if (useCache) {
         tl = lcSession.checkLock(findTabletInCache(startRow));
       } else {
-        tl = _findTablet(context, startRow, false, false, false, lcSession, 
locationNeed,
-            cacheCutoffTimer);
+        tl = _findTablet(context, startRow, false, lcSession, locationNeed, 
cacheCutoffTimer);
       }
 
       if (tl == null) {
@@ -365,8 +387,8 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
           row.append(new byte[] {0}, 0, 1);
           tl = lcSession.checkLock(findTabletInCache(row));
         } else {
-          tl = _findTablet(context, tl.getExtent().endRow(), true, false, 
false, lcSession,
-              locationNeed, cacheCutoffTimer);
+          tl = _findTablet(context, tl.getExtent().endRow(), true, lcSession, 
locationNeed,
+              cacheCutoffTimer);
         }
 
         if (tl == null) {
@@ -425,20 +447,12 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
     LockCheckerSession lcSession = new LockCheckerSession();
 
     List<Range> failures;
-    rLock.lock();
-    try {
-      processInvalidated(context, lcSession);
-
-      // for this to be optimal, need to look ranges up in sorted order when
-      // ranges are not present in cache... however do not want to always
-      // sort ranges... therefore try binning ranges using only the cache
-      // and sort whatever fails and retry
-
-      failures = findTablets(context, ranges, rangeConsumer, true, lcSession, 
locationNeed,
-          keyExtent -> {});
-    } finally {
-      rLock.unlock();
-    }
+    // for this to be optimal, need to look ranges up in sorted order when
+    // ranges are not present in cache... however do not want to always
+    // sort ranges... therefore try binning ranges using only the cache
+    // and sort whatever fails and retry
+    failures =
+        findTablets(context, ranges, rangeConsumer, true, lcSession, 
locationNeed, keyExtent -> {});
 
     if (!failures.isEmpty()) {
       // sort failures by range start key
@@ -455,17 +469,10 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
       }
 
       // try lookups again
-      wLock.lock();
-      try {
-
-        failures = findTablets(context, failures, rangeConsumer, false, 
lcSession, locationNeed,
-            locationLessConsumer);
-      } finally {
-        wLock.unlock();
-      }
+      failures = findTablets(context, failures, rangeConsumer, false, 
lcSession, locationNeed,
+          locationLessConsumer);
 
       requestTabletHosting(context, locationLess);
-
     }
 
     if (timer != null) {
@@ -478,12 +485,7 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
 
   @Override
   public void invalidateCache(KeyExtent failedExtent) {
-    wLock.lock();
-    try {
-      badExtents.add(failedExtent);
-    } finally {
-      wLock.unlock();
-    }
+    removeOverlapping(metaCache, failedExtent);
     if (log.isTraceEnabled()) {
       log.trace("Invalidated extent={}", failedExtent);
     }
@@ -491,12 +493,7 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
 
   @Override
   public void invalidateCache(Collection<KeyExtent> keySet) {
-    wLock.lock();
-    try {
-      badExtents.addAll(keySet);
-    } finally {
-      wLock.unlock();
-    }
+    keySet.forEach(extent -> removeOverlapping(metaCache, extent));
     if (log.isTraceEnabled()) {
       log.trace("Invalidated {} cache entries for table {}", keySet.size(), 
tableId);
     }
@@ -504,14 +501,8 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
 
   @Override
   public void invalidateCache() {
-    int invalidatedCount;
-    wLock.lock();
-    try {
-      invalidatedCount = metaCache.size();
-      metaCache.clear();
-    } finally {
-      wLock.unlock();
-    }
+    int invalidatedCount = metaCache.size();
+    metaCache.clear();
     this.tabletHostingRequestCount.set(0);
     if (log.isTraceEnabled()) {
       log.trace("invalidated all {} cache entries for table={}", 
invalidatedCount, tableId);
@@ -533,8 +524,7 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
     }
 
     LockCheckerSession lcSession = new LockCheckerSession();
-    CachedTablet tl =
-        _findTablet(context, row, skipRow, false, true, lcSession, 
locationNeed, Timer.startNew());
+    CachedTablet tl = _findTablet(context, row, skipRow, lcSession, 
locationNeed, Timer.startNew());
 
     if (timer != null) {
       log.trace("tid={} Located tablet {} at {} in {}", 
Thread.currentThread().getId(),
@@ -594,8 +584,8 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
           break;
         }
 
-        CachedTablet followingTablet = _findTablet(context, 
currTablet.endRow(), true, false, true,
-            lcSession, locationNeed, cacheCutoffTimer);
+        CachedTablet followingTablet = _findTablet(context, 
currTablet.endRow(), true, lcSession,
+            locationNeed, cacheCutoffTimer);
 
         if (followingTablet == null) {
           break;
@@ -692,64 +682,90 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
     }
   }
 
-  private void lookupTablet(ClientContext context, Text row, boolean retry,
-      LockCheckerSession lcSession) throws AccumuloException, 
AccumuloSecurityException,
-      TableNotFoundException, InvalidTabletHostingRequestException {
+  private void lookupTablet(ClientContext context, Text row, 
LockCheckerSession lcSession)
+      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
+      InvalidTabletHostingRequestException {
     Text metadataRow = new Text(tableId.canonical());
     metadataRow.append(new byte[] {';'}, 0, 1);
     metadataRow.append(row.getBytes(), 0, row.getLength());
     CachedTablet ptl = parent.findTablet(context, metadataRow, false, 
LocationNeed.REQUIRED);
 
     if (ptl != null) {
-      CachedTablets cachedTablets =
-          tabletObtainer.lookupTablet(context, ptl, metadataRow, 
lastTabletRow, parent);
-      while (cachedTablets != null && 
cachedTablets.getCachedTablets().isEmpty()) {
-        // try the next tablet, the current tablet does not have any tablets 
that overlap the row
-        Text er = ptl.getExtent().endRow();
-        if (er != null && er.compareTo(lastTabletRow) < 0) {
-          // System.out.println("er "+er+" ltr "+lastTabletRow);
-          ptl = parent.findTablet(context, er, true, LocationNeed.REQUIRED);
-          if (ptl != null) {
-            cachedTablets =
-                tabletObtainer.lookupTablet(context, ptl, metadataRow, 
lastTabletRow, parent);
-          } else {
-            break;
+      // Only allow a single lookup at time per parent tablet. For example if 
a tables tablets are
+      // all stored in three metadata tablets, then that table could have up 
to three concurrent
+      // metadata lookups.
+      Timer timer = Timer.startNew();
+      try (var unused = lookupLocks.lock(ptl.getExtent())) {
+        // See if entry was added to cache by another thread while we were 
waiting on the lock
+        var cached = findTabletInCache(row);
+        if (cached != null && cached.getCreationTimer().startedAfter(timer)) {
+          // This cache entry was added after we started waiting on the lock 
so lets use it and not
+          // go to the metadata table. This means another thread was holding 
the lock and doing
+          // metadata lookups when we requested the lock.
+          return;
+        }
+        // Lookup tablets in metadata table and update cache. Also updating 
the cache while holding
+        // the lock is important as it ensures other threads that are waiting 
on the lock will see
+        // what this thread found and may be able to avoid metadata lookups.
+        lookupTablet(context, lcSession, ptl, metadataRow);
+      }
+    }
+  }
+
+  private void lookupTablet(ClientContext context, LockCheckerSession 
lcSession, CachedTablet ptl,
+      Text metadataRow) throws AccumuloSecurityException, AccumuloException, 
TableNotFoundException,
+      InvalidTabletHostingRequestException {
+    CachedTablets cachedTablets =
+        tabletObtainer.lookupTablet(context, ptl, metadataRow, lastTabletRow);
+    if (cachedTablets == null) {
+      parent.invalidateCache(ptl.getExtent());
+    }
+    while (cachedTablets != null && 
cachedTablets.getCachedTablets().isEmpty()) {
+      // try the next tablet, the current tablet does not have any tablets 
that overlap the row
+      Text er = ptl.getExtent().endRow();
+      if (er != null && er.compareTo(lastTabletRow) < 0) {
+        // System.out.println("er "+er+" ltr "+lastTabletRow);
+        ptl = parent.findTablet(context, er, true, LocationNeed.REQUIRED);
+        if (ptl != null) {
+          cachedTablets = tabletObtainer.lookupTablet(context, ptl, 
metadataRow, lastTabletRow);
+          if (cachedTablets == null) {
+            parent.invalidateCache(ptl.getExtent());
           }
         } else {
           break;
         }
+      } else {
+        break;
       }
+    }
 
-      if (cachedTablets == null) {
-        return;
-      }
+    if (cachedTablets == null) {
+      return;
+    }
 
-      // cannot assume the list contains contiguous key extents... so it is 
probably
-      // best to deal with each extent individually
+    // cannot assume the list contains contiguous key extents... so it is 
probably
+    // best to deal with each extent individually
 
-      Text lastEndRow = null;
-      for (CachedTablet cachedTablet : cachedTablets.getCachedTablets()) {
+    Text lastEndRow = null;
+    for (CachedTablet cachedTablet : cachedTablets.getCachedTablets()) {
 
-        KeyExtent ke = cachedTablet.getExtent();
-        CachedTablet locToCache;
+      KeyExtent ke = cachedTablet.getExtent();
+      CachedTablet locToCache;
 
-        // create new location if current prevEndRow == endRow
-        if ((lastEndRow != null) && (ke.prevEndRow() != null)
-            && ke.prevEndRow().equals(lastEndRow)) {
-          locToCache = new CachedTablet(new KeyExtent(ke.tableId(), 
ke.endRow(), lastEndRow),
-              cachedTablet.getTserverLocation(), 
cachedTablet.getTserverSession(),
-              cachedTablet.getAvailability(), 
cachedTablet.wasHostingRequested());
-        } else {
-          locToCache = cachedTablet;
-        }
+      // create new location if current prevEndRow == endRow
+      if ((lastEndRow != null) && (ke.prevEndRow() != null) && 
ke.prevEndRow().equals(lastEndRow)) {
+        locToCache = new CachedTablet(new KeyExtent(ke.tableId(), ke.endRow(), 
lastEndRow),
+            cachedTablet.getTserverLocation(), 
cachedTablet.getTserverSession(),
+            cachedTablet.getAvailability(), 
cachedTablet.wasHostingRequested());
+      } else {
+        locToCache = cachedTablet;
+      }
 
-        // save endRow for next iteration
-        lastEndRow = locToCache.getExtent().endRow();
+      // save endRow for next iteration
+      lastEndRow = locToCache.getExtent().endRow();
 
-        updateCache(locToCache, lcSession);
-      }
+      updateCache(locToCache, lcSession);
     }
-
   }
 
   private void updateCache(CachedTablet cachedTablet, LockCheckerSession 
lcSession) {
@@ -773,13 +789,9 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
       er = MAX_TEXT;
     }
     metaCache.put(er, cachedTablet);
-
-    if (!badExtents.isEmpty()) {
-      removeOverlapping(badExtents, cachedTablet.getExtent());
-    }
   }
 
-  static void removeOverlapping(TreeMap<Text,CachedTablet> metaCache, 
KeyExtent nke) {
+  static void removeOverlapping(NavigableMap<Text,CachedTablet> metaCache, 
KeyExtent nke) {
     Iterator<Entry<Text,CachedTablet>> iter;
 
     if (nke.prevEndRow() == null) {
@@ -814,12 +826,6 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
     return row;
   }
 
-  static void removeOverlapping(TreeSet<KeyExtent> extents, KeyExtent nke) {
-    for (KeyExtent overlapping : KeyExtent.findOverlapping(nke, extents)) {
-      extents.remove(overlapping);
-    }
-  }
-
   private CachedTablet findTabletInCache(Text row) {
 
     Entry<Text,CachedTablet> entry = metaCache.ceilingEntry(row);
@@ -839,117 +845,35 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
    *        we should instead ignore them and reread the tablet information 
from the metadata table.
    */
   protected CachedTablet _findTablet(ClientContext context, Text row, boolean 
skipRow,
-      boolean retry, boolean lock, LockCheckerSession lcSession, LocationNeed 
locationNeed,
-      Timer cacheCutoffTimer) throws AccumuloException, 
AccumuloSecurityException,
-      TableNotFoundException, InvalidTabletHostingRequestException {
-
+      LockCheckerSession lcSession, LocationNeed locationNeed, Timer 
cacheCutoffTimer)
+      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
+      InvalidTabletHostingRequestException {
     if (skipRow) {
       row = new Text(row);
       row.append(new byte[] {0}, 0, 1);
     }
 
-    CachedTablet tl;
-
-    if (lock) {
-      rLock.lock();
-      try {
-        tl = processInvalidatedAndCheckLock(context, lcSession, row);
-      } finally {
-        rLock.unlock();
-      }
-    } else {
-      tl = processInvalidatedAndCheckLock(context, lcSession, row);
-    }
+    CachedTablet tl = lcSession.checkLock(findTabletInCache(row));
 
     if (tl == null || (locationNeed == LocationNeed.REQUIRED && 
tl.getTserverLocation().isEmpty()
         && cacheCutoffTimer.startedAfter(tl.getCreationTimer()))) {
 
       // not in cache OR the cutoff timer was started after when the cached 
entry timer was started,
       // so obtain info from metadata table
-      if (lock) {
-        wLock.lock();
-        try {
-          tl = lookupTabletLocationAndCheckLock(context, row, retry, 
lcSession);
-        } finally {
-          wLock.unlock();
-        }
-      } else {
-        tl = lookupTabletLocationAndCheckLock(context, row, retry, lcSession);
-      }
+      tl = lookupTabletLocationAndCheckLock(context, row, lcSession);
+
     }
 
     return tl;
   }
 
   private CachedTablet lookupTabletLocationAndCheckLock(ClientContext context, 
Text row,
-      boolean retry, LockCheckerSession lcSession) throws AccumuloException,
-      AccumuloSecurityException, TableNotFoundException, 
InvalidTabletHostingRequestException {
-    lookupTablet(context, row, retry, lcSession);
-    return lcSession.checkLock(findTabletInCache(row));
-  }
-
-  private CachedTablet processInvalidatedAndCheckLock(ClientContext context,
-      LockCheckerSession lcSession, Text row) throws 
AccumuloSecurityException, AccumuloException,
+      LockCheckerSession lcSession) throws AccumuloException, 
AccumuloSecurityException,
       TableNotFoundException, InvalidTabletHostingRequestException {
-    processInvalidated(context, lcSession);
+    lookupTablet(context, row, lcSession);
     return lcSession.checkLock(findTabletInCache(row));
   }
 
-  @SuppressFBWarnings(value = {"UL_UNRELEASED_LOCK", 
"UL_UNRELEASED_LOCK_EXCEPTION_PATH"},
-      justification = "locking is confusing, but probably correct")
-  private void processInvalidated(ClientContext context, LockCheckerSession 
lcSession)
-      throws AccumuloSecurityException, AccumuloException, 
TableNotFoundException,
-      InvalidTabletHostingRequestException {
-
-    if (badExtents.isEmpty()) {
-      return;
-    }
-
-    final boolean writeLockHeld = rwLock.isWriteLockedByCurrentThread();
-    try {
-      if (!writeLockHeld) {
-        rLock.unlock();
-        wLock.lock();
-        if (badExtents.isEmpty()) {
-          return;
-        }
-      }
-
-      List<Range> lookups = new ArrayList<>(badExtents.size());
-
-      for (KeyExtent be : badExtents) {
-        lookups.add(be.toMetaRange());
-        removeOverlapping(metaCache, be);
-      }
-
-      lookups = Range.mergeOverlapping(lookups);
-
-      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
-
-      parent.findTablets(context, lookups,
-          (cachedTablet, range) -> addRange(binnedRanges, cachedTablet, range),
-          LocationNeed.REQUIRED);
-
-      // randomize server order
-      ArrayList<String> tabletServers = new ArrayList<>(binnedRanges.keySet());
-      Collections.shuffle(tabletServers);
-
-      for (String tserver : tabletServers) {
-        List<CachedTablet> locations =
-            tabletObtainer.lookupTablets(context, tserver, 
binnedRanges.get(tserver), parent);
-
-        for (CachedTablet cachedTablet : locations) {
-          updateCache(cachedTablet, lcSession);
-        }
-      }
-    } finally {
-      if (!writeLockHeld) {
-        rLock.lock();
-        wLock.unlock();
-      }
-    }
-  }
-
   static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, 
CachedTablet ct,
       Range range) {
     binnedRanges.computeIfAbsent(ct.getTserverLocation().orElseThrow(), k -> 
new HashMap<>())
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataCachedTabletObtainer.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataCachedTabletObtainer.java
index 00a7739716..0ddc12701c 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataCachedTabletObtainer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataCachedTabletObtainer.java
@@ -21,10 +21,8 @@ package org.apache.accumulo.core.metadata;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -36,18 +34,13 @@ import java.util.TreeSet;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.admin.TabletAvailability;
 import org.apache.accumulo.core.clientImpl.AccumuloServerException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.clientImpl.ClientTabletCache;
 import org.apache.accumulo.core.clientImpl.ClientTabletCache.CachedTablet;
 import org.apache.accumulo.core.clientImpl.ClientTabletCache.CachedTablets;
 import 
org.apache.accumulo.core.clientImpl.ClientTabletCacheImpl.CachedTabletObtainer;
-import org.apache.accumulo.core.clientImpl.ScannerOptions;
 import org.apache.accumulo.core.clientImpl.TabletAvailabilityUtil;
-import org.apache.accumulo.core.clientImpl.TabletServerBatchReaderIterator;
-import 
org.apache.accumulo.core.clientImpl.TabletServerBatchReaderIterator.ResultReceiver;
 import org.apache.accumulo.core.clientImpl.ThriftScanner;
 import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Key;
@@ -71,7 +64,6 @@ public class MetadataCachedTabletObtainer implements 
CachedTabletObtainer {
   private static final Logger log = 
LoggerFactory.getLogger(MetadataCachedTabletObtainer.class);
 
   private final SortedSet<Column> locCols;
-  private final ArrayList<Column> columns;
 
   public MetadataCachedTabletObtainer() {
 
@@ -80,12 +72,11 @@ public class MetadataCachedTabletObtainer implements 
CachedTabletObtainer {
     locCols.add(TabletColumnFamily.PREV_ROW_COLUMN.toColumn());
     locCols.add(TabletColumnFamily.AVAILABILITY_COLUMN.toColumn());
     locCols.add(TabletColumnFamily.REQUESTED_COLUMN.toColumn());
-    columns = new ArrayList<>(locCols);
   }
 
   @Override
-  public CachedTablets lookupTablet(ClientContext context, CachedTablet src, 
Text row, Text stopRow,
-      ClientTabletCache parent) throws AccumuloSecurityException, 
AccumuloException {
+  public CachedTablets lookupTablet(ClientContext context, CachedTablet src, 
Text row, Text stopRow)
+      throws AccumuloSecurityException, AccumuloException {
 
     try {
 
@@ -147,7 +138,6 @@ public class MetadataCachedTabletObtainer implements 
CachedTabletObtainer {
       if (log.isTraceEnabled()) {
         log.trace("{} lookup failed", src.getExtent().tableId(), e);
       }
-      parent.invalidateCache(src.getExtent());
     }
 
     return null;
@@ -164,60 +154,6 @@ public class MetadataCachedTabletObtainer implements 
CachedTabletObtainer {
     }
   }
 
-  private static class SettableScannerOptions extends ScannerOptions {
-    public ScannerOptions setColumns(SortedSet<Column> locCols) {
-      this.fetchedColumns = locCols;
-      // see comment in lookupTablet about why iterator is used
-      addScanIterator(new IteratorSetting(10000, "WRI", 
WholeRowIterator.class.getName()));
-      return this;
-    }
-  }
-
-  @Override
-  public List<CachedTablet> lookupTablets(ClientContext context, String 
tserver,
-      Map<KeyExtent,List<Range>> tabletsRanges, ClientTabletCache parent)
-      throws AccumuloSecurityException, AccumuloException {
-
-    final TreeMap<Key,Value> results = new TreeMap<>();
-
-    ResultReceiver rr = entries -> {
-      for (Entry<Key,Value> entry : entries) {
-        try {
-          results.putAll(WholeRowIterator.decodeRow(entry.getKey(), 
entry.getValue()));
-        } catch (IOException e) {
-          throw new UncheckedIOException(e);
-        }
-      }
-    };
-
-    ScannerOptions opts = null;
-    try (SettableScannerOptions unsetOpts = new SettableScannerOptions()) {
-      opts = unsetOpts.setColumns(locCols);
-    }
-
-    Map<KeyExtent,List<Range>> unscanned = new HashMap<>();
-    Map<KeyExtent,List<Range>> failures = new HashMap<>();
-    try {
-      TabletServerBatchReaderIterator.doLookup(context, tserver, 
tabletsRanges, failures, unscanned,
-          rr, columns, opts, Authorizations.EMPTY);
-      if (!failures.isEmpty()) {
-        // invalidate extents in parents cache
-        if (log.isTraceEnabled()) {
-          log.trace("lookupTablets failed for {} extents", failures.size());
-        }
-        parent.invalidateCache(failures.keySet());
-      }
-    } catch (IOException e) {
-      log.trace("lookupTablets failed server={}", tserver, e);
-      parent.invalidateCache(tabletsRanges.keySet());
-    } catch (AccumuloServerException e) {
-      log.trace("lookupTablets failed server={}", tserver, e);
-      throw e;
-    }
-
-    return 
MetadataCachedTabletObtainer.getMetadataLocationEntries(results).getCachedTablets();
-  }
-
   public static CachedTablets getMetadataLocationEntries(SortedMap<Key,Value> 
entries) {
     Text location = null;
     Text session = null;
diff --git a/core/src/main/java/org/apache/accumulo/core/util/LockMap.java 
b/core/src/main/java/org/apache/accumulo/core/util/LockMap.java
new file mode 100644
index 0000000000..da87a92b68
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/LockMap.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Manages lock acquisition and disposal for locks of type T by returning a 
distinct per-key lock
+ * upon request, and automatically disposing of it when no locks are needed 
for that key. Callers
+ * should acquire the lock from this class in a try-with-resources block so 
that it is automatically
+ * closed when the caller has no need of the lock.
+ */
+public class LockMap<T> {
+
+  // This class relies on the atomic nature of the ConcurrentHashMap compute 
function to track the
+  // number of references for each lock. Not all concurrent map 
implementations have an atomic
+  // compute function.
+  private final ConcurrentHashMap<T,PerKeyLockImpl> locks = new 
ConcurrentHashMap<>();
+
+  public interface PerKeyLock extends AutoCloseable {
+
+    /**
+     * Releases the per-key lock and, if no other thread is using the lock, 
disposes of it.
+     */
+    @Override
+    void close();
+  }
+
+  private class PerKeyLockImpl implements PerKeyLock {
+    private final Lock lock = new ReentrantLock();
+    // This variable is only read or written inside synchronized blocks inside 
ConcurrentHashMap and
+    // therefore does not need to be volatile for visibility between threads.
+    private int refCount = 1;
+    private final T key;
+
+    private PerKeyLockImpl(T key) {
+      this.key = key;
+    }
+
+    @Override
+    public void close() {
+      lock.unlock();
+      returnLock(this);
+    }
+  }
+
+  /**
+   * Finds an existing per-key lock object or creates a new one if none 
exists, then waits until the
+   * lock is acquired. Will never create more than one lock for the same key 
at the same time.
+   * Callers should call this method in a try-with-resources block, since 
closing the lock releases
+   * it.
+   */
+  public PerKeyLock lock(T key) {
+    Objects.requireNonNull(key);
+    var perKeyLock = getOrCreateLock(key);
+    perKeyLock.lock.lock();
+    return perKeyLock;
+  }
+
+  private PerKeyLockImpl getOrCreateLock(T key) {
+    // Create a lock for extents as needed. Assuming only one thread will 
execute the compute
+    // function per key.
+    var rcl = locks.compute(key, (k, v) -> {
+      if (v == null) {
+        return new PerKeyLockImpl(key);
+      } else {
+        Preconditions.checkState(v.refCount > 0);
+        v.refCount++;
+        return v;
+      }
+    });
+    return rcl;
+  }
+
+  private void returnLock(PerKeyLockImpl rcl) {
+    // Dispose of the lock if nothing else is using it. Assuming only one 
thread will execute the
+    // compute function per key.
+    locks.compute(rcl.key, (k, v) -> {
+      Objects.requireNonNull(v);
+      Preconditions.checkState(v.refCount > 0);
+      // while the ref count was >0 the reference should not have changed in 
the map
+      Preconditions.checkState(v == rcl);
+      v.refCount--;
+      if (v.refCount == 0) {
+        // No threads are using the lock anymore, so dispose of it
+        return null;
+      } else {
+        return v;
+      }
+    });
+  }
+
+}
diff --git 
a/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImplTest.java
 
b/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImplTest.java
index 601dff11bb..e303a449cb 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImplTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImplTest.java
@@ -32,12 +32,18 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -498,7 +504,7 @@ public class ClientTabletCacheImplTest {
 
   static class TServers {
     private final Map<String,Map<KeyExtent,SortedMap<Key,Value>>> tservers = 
new HashMap<>();
-    private BiConsumer<CachedTablet,Text> lookupConsumer = (cachedTablet, row) 
-> {};
+    private volatile BiConsumer<CachedTablet,Text> lookupConsumer = 
(cachedTablet, row) -> {};
   }
 
   static class TestCachedTabletObtainer implements CachedTabletObtainer {
@@ -513,7 +519,7 @@ public class ClientTabletCacheImplTest {
 
     @Override
     public CachedTablets lookupTablet(ClientContext context, CachedTablet src, 
Text row,
-        Text stopRow, ClientTabletCache parent) {
+        Text stopRow) {
 
       lookupConsumer.accept(src, row);
 
@@ -521,14 +527,12 @@ public class ClientTabletCacheImplTest {
           tservers.get(src.getTserverLocation().orElseThrow());
 
       if (tablets == null) {
-        parent.invalidateCache(src.getExtent());
         return null;
       }
 
       SortedMap<Key,Value> tabletData = tablets.get(src.getExtent());
 
       if (tabletData == null) {
-        parent.invalidateCache(src.getExtent());
         return null;
       }
 
@@ -542,60 +546,6 @@ public class ClientTabletCacheImplTest {
 
       return MetadataCachedTabletObtainer.getMetadataLocationEntries(results);
     }
-
-    @Override
-    public List<CachedTablet> lookupTablets(ClientContext context, String 
tserver,
-        Map<KeyExtent,List<Range>> map, ClientTabletCache parent) {
-
-      ArrayList<CachedTablet> list = new ArrayList<>();
-
-      Map<KeyExtent,SortedMap<Key,Value>> tablets = tservers.get(tserver);
-
-      if (tablets == null) {
-        parent.invalidateCache(map.keySet());
-        return list;
-      }
-
-      TreeMap<Key,Value> results = new TreeMap<>();
-
-      Set<Entry<KeyExtent,List<Range>>> es = map.entrySet();
-      List<KeyExtent> failures = new ArrayList<>();
-      for (Entry<KeyExtent,List<Range>> entry : es) {
-        SortedMap<Key,Value> tabletData = tablets.get(entry.getKey());
-
-        if (tabletData == null) {
-          failures.add(entry.getKey());
-          continue;
-        }
-        List<Range> ranges = entry.getValue();
-        for (Range range : ranges) {
-          SortedMap<Key,Value> tm;
-          if (range.getStartKey() == null) {
-            tm = tabletData;
-          } else {
-            tm = tabletData.tailMap(range.getStartKey());
-          }
-
-          for (Entry<Key,Value> de : tm.entrySet()) {
-            if (range.afterEndKey(de.getKey())) {
-              break;
-            }
-
-            if (range.contains(de.getKey())) {
-              results.put(de.getKey(), de.getValue());
-            }
-          }
-        }
-      }
-
-      if (!failures.isEmpty()) {
-        parent.invalidateCache(failures);
-      }
-
-      return 
MetadataCachedTabletObtainer.getMetadataLocationEntries(results).getCachedTablets();
-
-    }
-
   }
 
   static class YesLockChecker implements TabletServerLockChecker {
@@ -839,6 +789,9 @@ public class ClientTabletCacheImplTest {
     tab1TabletCache.invalidateCache(tab1e21);
     tab1TabletCache.invalidateCache(tab1e22);
 
+    // parent cache has incorrect entry, the first attempt will fail and clear 
that, should work on
+    // 2nd attempt
+    locateTabletTest(tab1TabletCache, "a", null, null);
     locateTabletTest(tab1TabletCache, "a", tab1e1, "tserver7");
     locateTabletTest(tab1TabletCache, "h", tab1e21, "tserver8");
     locateTabletTest(tab1TabletCache, "r", tab1e22, "tserver9");
@@ -1971,4 +1924,281 @@ public class ClientTabletCacheImplTest {
         List.of(range3)), seen);
     assertEquals(0, failures.size());
   }
+
+  @Test
+  public void testMultithreadedLookups() throws Exception {
+
+    // This test ensures that when multiple threads all attempt to lookup data 
that is not currently
+    // in the cache that the minimal amount of metadata lookups are done. 
Should only see one
+    // concurrent lookup per metadata tablet and no more or less.
+    KeyExtent mte1 = new KeyExtent(AccumuloTable.METADATA.tableId(), new 
Text("foo;m"), null);
+    KeyExtent mte2 = new KeyExtent(AccumuloTable.METADATA.tableId(), null, new 
Text("foo;m"));
+
+    var ke1 = createNewKeyExtent("foo", "m", null);
+    var ke2 = createNewKeyExtent("foo", "q", "m");
+    var ke3 = createNewKeyExtent("foo", null, "q");
+
+    TServers tservers = new TServers();
+
+    List<KeyExtent> lookups = Collections.synchronizedList(new ArrayList<>());
+    AtomicInteger activeLookups = new AtomicInteger(0);
+    AtomicBoolean sawTwoActive = new AtomicBoolean(false);
+    tservers.lookupConsumer = (src, row) -> {
+      if (!src.getExtent().equals(ROOT_TABLE_EXTENT)) {
+        lookups.add(src.getExtent());
+        // increment the total number of threads currently doing metadata 
lookups
+        activeLookups.incrementAndGet();
+        try {
+          while (!sawTwoActive.get()) {
+            // sleep to simulate metadata lookup that takes time, this gives 
threads time to pile up
+            // waiting for the metadata lookup
+            Thread.sleep(100);
+            // wait until two threads are doing metadata lookups
+            if (activeLookups.get() == 2) {
+              sawTwoActive.set(true);
+            } else {
+              // Should never see more than two threads in here because the 
cache should not do more
+              // than one lookup per metadata tablet at a time and there are 
two metadata tablets.
+              assertTrue(activeLookups.get() <= 2);
+            }
+          }
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        } finally {
+          activeLookups.decrementAndGet();
+        }
+      }
+    };
+    TestCachedTabletObtainer ttlo = new TestCachedTabletObtainer(tservers);
+
+    RootClientTabletCache rtl = new TestRootClientTabletCache();
+
+    ClientTabletCacheImpl rootTabletCache = new ClientTabletCacheImpl(
+        AccumuloTable.METADATA.tableId(), rtl, ttlo, new YesLockChecker());
+    ClientTabletCacheImpl metaCache =
+        new ClientTabletCacheImpl(TableId.of("foo"), rootTabletCache, ttlo, 
new YesLockChecker());
+
+    setLocation(tservers, "tserver1", ROOT_TABLE_EXTENT, mte1, "tserver2");
+    setLocation(tservers, "tserver1", ROOT_TABLE_EXTENT, mte2, "tserver3");
+
+    setLocation(tservers, "tserver2", mte1, ke1, "tserver7");
+    setLocation(tservers, "tserver3", mte2, ke2, "tserver8");
+    setLocation(tservers, "tserver3", mte2, ke3, "tserver9");
+
+    var executor = Executors.newCachedThreadPool();
+    List<Future<CachedTablet>> futures = new ArrayList<>();
+
+    // start 64 threads all trying to lookup data in the cache, should see 
only two threads do a
+    // concurrent lookup in the metadata table and no more or less.
+    List<String> rowsToLookup = new ArrayList<>();
+
+    for (int i = 0; i < 64; i++) {
+      String lookup = (char) ('a' + (i % 26)) + "";
+      rowsToLookup.add(lookup);
+    }
+
+    Collections.shuffle(rowsToLookup);
+
+    for (var lookup : rowsToLookup) {
+      var future = executor.submit(() -> {
+        var loc = metaCache.findTablet(context, new Text(lookup), false, 
LocationNeed.REQUIRED);
+        if (lookup.compareTo("m") <= 0) {
+          assertEquals("tserver7", loc.getTserverLocation().orElseThrow());
+        } else if (lookup.compareTo("q") <= 0) {
+          assertEquals("tserver8", loc.getTserverLocation().orElseThrow());
+        } else {
+          assertEquals("tserver9", loc.getTserverLocation().orElseThrow());
+        }
+        return loc;
+      });
+      futures.add(future);
+    }
+
+    for (var future : futures) {
+      assertNotNull(future.get());
+    }
+
+    assertTrue(sawTwoActive.get());
+    // The second metadata tablet (mte2) contains two user tablets (ke2 and 
ke3). Depending on which
+    // of these two user tablets is looked up in the metadata table first will 
see a total of 2 or 3
+    // lookups. If the location of ke2 is looked up first then it will get the 
locations of ke2 and
+    // ke3 from mte2 and put them in the cache. If the location of ke3 is 
looked up first then it
+    // will only get the location of ke3 from mte2 and not ke2.
+    assertTrue(lookups.size() == 2 || lookups.size() == 3, lookups::toString);
+    assertEquals(1, lookups.stream().filter(metadataExtent -> 
metadataExtent.equals(mte1)).count(),
+        lookups::toString);
+    var mte2Lookups =
+        lookups.stream().filter(metadataExtent -> 
metadataExtent.equals(mte2)).count();
+    assertTrue(mte2Lookups == 1 || mte2Lookups == 2, lookups::toString);
+    executor.shutdown();
+  }
+
+  @Test
+  public void testNonBlocking() throws Exception {
+    // Tests that when two threads query the cache and one needs to read from 
metadata table and one
+    // does not, that the one that does not is not blocked. This test ensures 
that when data is in
+    // cache it can always be accessed immediately.
+
+    List<KeyExtent> lookups = new ArrayList<>();
+    TServers tservers = new TServers();
+    // This lock is used by the test to cause a metadata lookup to block
+    var lookupLock = new ReentrantLock();
+    tservers.lookupConsumer = (src, row) -> {
+      lookupLock.lock();
+      try {
+        lookups.add(src.getExtent());
+      } finally {
+        lookupLock.unlock();
+      }
+    };
+
+    ClientTabletCacheImpl metaCache = createLocators(tservers, "tserver1", 
"tserver2", "foo");
+
+    var ke1 = createNewKeyExtent("foo", "g", null);
+    var ke2 = createNewKeyExtent("foo", "m", "g");
+    var ke3 = createNewKeyExtent("foo", "r", "m");
+    var ke4 = createNewKeyExtent("foo", null, "r");
+
+    setLocation(tservers, "tserver2", METADATA_TABLE_EXTENT, ke1, "tserver3");
+    setLocation(tservers, "tserver2", METADATA_TABLE_EXTENT, ke2, "tserver4");
+    setLocation(tservers, "tserver2", METADATA_TABLE_EXTENT, ke3, "tserver5");
+    setLocation(tservers, "tserver2", METADATA_TABLE_EXTENT, ke4, "tserver6");
+
+    assertEquals("tserver3",
+        metaCache.findTablet(context, new Text("a"), false, 
LocationNeed.REQUIRED)
+            .getTserverLocation().orElseThrow());
+    assertEquals("tserver4",
+        metaCache.findTablet(context, new Text("h"), false, 
LocationNeed.REQUIRED)
+            .getTserverLocation().orElseThrow());
+    assertEquals("tserver5",
+        metaCache.findTablet(context, new Text("n"), false, 
LocationNeed.REQUIRED)
+            .getTserverLocation().orElseThrow());
+    assertEquals("tserver6",
+        metaCache.findTablet(context, new Text("s"), false, 
LocationNeed.REQUIRED)
+            .getTserverLocation().orElseThrow());
+
+    // clear this extent from cache to cause it to be looked up again in the 
metadata table
+    metaCache.invalidateCache(ke2);
+
+    var executor = Executors.newCachedThreadPool();
+
+    // acquire this lock to simulate a metadata table lookup blocking
+    lookupLock.lock();
+
+    // verify test assumption
+    assertEquals(0, lookupLock.getQueueLength());
+
+    // start a background task that will read from the metadata table
+    var lookupFuture = executor
+        .submit(() -> metaCache.findTablet(context, new Text("h"), false, 
LocationNeed.REQUIRED)
+            .getTserverLocation().orElseThrow());
+
+    // wait for the background thread to get stuck waiting on the lock
+    while (lookupLock.getQueueLength() == 0) {
+      Thread.sleep(5);
+    }
+
+    // The lookup task should be blocked trying to get location from the 
metadata table
+    assertFalse(lookupFuture.isDone());
+    assertEquals(1, lookupLock.getQueueLength());
+
+    // should be able to get the tablet locations that are still in the cache 
w/o blocking
+    assertEquals("tserver3",
+        metaCache.findTablet(context, new Text("a"), false, 
LocationNeed.REQUIRED)
+            .getTserverLocation().orElseThrow());
+    assertEquals("tserver5",
+        metaCache.findTablet(context, new Text("n"), false, 
LocationNeed.REQUIRED)
+            .getTserverLocation().orElseThrow());
+    assertEquals("tserver6",
+        metaCache.findTablet(context, new Text("s"), false, 
LocationNeed.REQUIRED)
+            .getTserverLocation().orElseThrow());
+
+    // The lookup task should still be blocked
+    assertFalse(lookupFuture.isDone());
+    assertEquals(1, lookupLock.getQueueLength());
+
+    // allow the metadata lookup running in background thread to proceed.
+    lookupLock.unlock();
+
+    // The future should be able to run and complete
+    assertEquals("tserver4", lookupFuture.get());
+
+    // verify test assumptions
+    assertTrue(lookupFuture.isDone());
+    assertEquals(0, lookupLock.getQueueLength());
+  }
+
+  @Test
+  public void testInvalidation() throws Exception {
+    // Test invalidation methods on the cache. Other test methods call the 
invalidate methods and
+    // assume it works, but do not verify its working as expected. Want to 
verify two things in this
+    // test. First verify that invalidation only invalidates what was 
requested and no more. Second
+    // verify that invalidation causes a metadata read on subsequent cache 
accesses.
+
+    List<String> lookups = new ArrayList<>();
+    TServers tservers = new TServers();
+    tservers.lookupConsumer = (src, row) -> {
+      if (src.getExtent().equals(METADATA_TABLE_EXTENT)) {
+        lookups.add(row.toString());
+      }
+    };
+
+    ClientTabletCacheImpl metaCache = createLocators(tservers, "tserver1", 
"tserver2", "foo");
+
+    var ke1 = createNewKeyExtent("foo", "g", null);
+    var ke2 = createNewKeyExtent("foo", "m", "g");
+    var ke3 = createNewKeyExtent("foo", "r", "m");
+    var ke4 = createNewKeyExtent("foo", null, "r");
+
+    setLocation(tservers, "tserver2", METADATA_TABLE_EXTENT, ke1, "tserver3");
+    setLocation(tservers, "tserver2", METADATA_TABLE_EXTENT, ke2, "tserver4");
+    setLocation(tservers, "tserver2", METADATA_TABLE_EXTENT, ke3, "tserver5");
+    setLocation(tservers, "tserver2", METADATA_TABLE_EXTENT, ke4, "tserver3");
+
+    // Do cache lookups in reverse order to force a metadata lookup for each 
row. The cache will
+    // read ahead when doing metadata lookups and want to avoid that in this 
case.
+    Map<String,String> expectedLocations = new LinkedHashMap<>();
+    expectedLocations.put("s", "tserver3");
+    expectedLocations.put("n", "tserver5");
+    expectedLocations.put("h", "tserver4");
+    expectedLocations.put("a", "tserver3");
+
+    checkLocations(expectedLocations, metaCache);
+    assertEquals(Set.of("foo;s", "foo;n", "foo;h", "foo;a"), 
Set.copyOf(lookups));
+
+    // invalidate one extent from the metadata table, should only read that 
extent from metadata
+    // table
+    metaCache.invalidateCache(ke2);
+    lookups.clear();
+    checkLocations(expectedLocations, metaCache);
+    assertEquals(Set.of("foo;h"), Set.copyOf(lookups));
+
+    // invalidate two extents
+    metaCache.invalidateCache(List.of(ke2, ke4));
+    lookups.clear();
+    checkLocations(expectedLocations, metaCache);
+    assertEquals(Set.of("foo;h", "foo;s"), Set.copyOf(lookups));
+
+    // invalidate everything in cache
+    metaCache.invalidateCache();
+    lookups.clear();
+    checkLocations(expectedLocations, metaCache);
+    assertEquals(Set.of("foo;s", "foo;n", "foo;h", "foo;a"), 
Set.copyOf(lookups));
+
+    // when nothing was invalidated, should see no metadata lookups
+    lookups.clear();
+    checkLocations(expectedLocations, metaCache);
+    assertEquals(Set.of(), Set.copyOf(lookups));
+  }
+
+  private void checkLocations(Map<String,String> expectedLocations, 
ClientTabletCacheImpl metaCache)
+      throws Exception {
+    for (var entry : expectedLocations.entrySet()) {
+      String row = entry.getKey();
+      String location = entry.getValue();
+      assertEquals(location,
+          metaCache.findTablet(context, new Text(row), false, 
LocationNeed.REQUIRED)
+              .getTserverLocation().orElseThrow());
+    }
+  }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/util/LockMapTest.java 
b/core/src/test/java/org/apache/accumulo/core/util/LockMapTest.java
new file mode 100644
index 0000000000..151e40ac7e
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/util/LockMapTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+
+public class LockMapTest {
+
+  /**
+   * Verify locks are created as needed and disposed of when no longer in use.
+   */
+  @Test
+  public void testReferences() {
+    var lm = new LockMap<String>();
+
+    var perKeyLock1 = lm.lock("abc");
+    var perKeyLock2 = lm.lock("abc");
+    var perKeyLock3 = lm.lock("def");
+
+    assertSame(perKeyLock1, perKeyLock2);
+    assertNotSame(perKeyLock1, perKeyLock3);
+
+    // This should not dispose of the lock as there is still one use of it
+    perKeyLock1.close();
+
+    // verify the lock for abc was not disposed
+    var perKeyLock4 = lm.lock("abc");
+    assertSame(perKeyLock2, perKeyLock4);
+
+    // these locks should be disposed of as nothing will be using them after 
the close calls
+    perKeyLock2.close();
+    perKeyLock3.close();
+    perKeyLock4.close();
+
+    // should create new locks for the two keys
+    var perKeyLock5 = lm.lock("abc");
+    var perKeyLock6 = lm.lock("def");
+    assertNotSame(perKeyLock4, perKeyLock5);
+    assertNotSame(perKeyLock3, perKeyLock6);
+    assertNotSame(perKeyLock5, perKeyLock6);
+
+    perKeyLock5.close();
+    perKeyLock6.close();
+  }
+
+  @Test
+  public void testMultiClose() {
+    var lm = new LockMap<String>();
+
+    var perKeyLock1 = lm.lock("abc");
+
+    perKeyLock1.close();
+    assertThrows(IllegalMonitorStateException.class, perKeyLock1::close);
+  }
+
+  /**
+   * This test attempts to verify two things for a LockMap instance. First 
that only one thread will
+   * ever be able to lock a given key. Second that different threads can lock 
different keys at the
+   * same time.
+   */
+  @Test
+  public void testConcurrency() throws Exception {
+    var executor = Executors.newFixedThreadPool(32);
+
+    try {
+      var lockMap = new LockMap<Integer>();
+      var random = LazySingletons.RANDOM.get();
+
+      var booleans = new AtomicBoolean[] {new AtomicBoolean(), new 
AtomicBoolean(),
+          new AtomicBoolean(), new AtomicBoolean(), new AtomicBoolean()};
+
+      var futures = new ArrayList<Future<Boolean>>();
+
+      var maxLocked = new AtomicLong(0);
+
+      for (int i = 0; i < 100; i++) {
+        int key = random.nextInt(booleans.length);
+        var future = executor.submit(() -> {
+          try (var unused = lockMap.lock(key)) {
+            var set1 = booleans[key].compareAndSet(false, true);
+            // maxLocked is used to check that at some point we see another 
thread w/ a lock for a
+            // different key.
+            do {
+              Thread.sleep(0, 100);
+            } while (maxLocked.accumulateAndGet(
+                Stream.of(booleans).filter(AtomicBoolean::get).count(), 
Long::max) < 2);
+            var set2 = booleans[key].compareAndSet(true, false);
+            // If only one thread executes per key, then set1 and set2 should 
always be true
+            return set1 && set2;
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        futures.add(future);
+      }
+
+      for (var future : futures) {
+        assertTrue(future.get());
+      }
+
+      assertTrue(maxLocked.get() >= 2 && maxLocked.get() <= 5);
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+}

Reply via email to