This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 4d38127e5a3f10e404e90323c49cf146f4c70007
Merge: 1aa81d81b0 e62ff01808
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Wed Dec 13 18:34:15 2023 +0000

    Merge branch '2.1'

 .../org/apache/accumulo/tserver/tablet/Tablet.java | 45 ++++++++++++++--------
 1 file changed, 30 insertions(+), 15 deletions(-)

diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 7c741729df,6a646c4f57..65db951634
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@@ -1733,17 -1750,40 +1733,40 @@@ public class Tablet extends TabletBase 
          throw new IOException("tablet " + extent + " is closed");
        }
  
-       // TODO check seems unneeded now - ACCUMULO-1291
-       long lockWait = System.currentTimeMillis() - now;
-       if (lockWait
-           > 
getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
 {
-         throw new IOException(
-             "Timeout waiting " + (lockWait / 1000.) + " seconds to get tablet 
lock for " + extent);
+       long rpcTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(
+           (long) 
(getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)
+               * 1.1));
+ 
+       // wait for any files that are bulk importing up to the RPC timeout 
limit
+       while (!Collections.disjoint(bulkImporting, fileMap.keySet())) {
+         try {
+           wait(1_000);
+         } catch (InterruptedException e) {
+           Thread.currentThread().interrupt();
+           throw new IllegalStateException(e);
+         }
+ 
+         long lockWait = System.nanoTime() - now;
+         if (lockWait > rpcTimeoutNanos) {
+           throw new IOException("Timeout waiting " + 
TimeUnit.NANOSECONDS.toSeconds(lockWait)
+               + " seconds to get tablet lock for " + extent + " " + tid);
+         }
+       }
+ 
+       // need to check this again because when wait is called above the lock 
is released.
+       if (isClosed()) {
+         throw new IOException("tablet " + extent + " is closed");
+       }
+ 
+       long lockWait = System.nanoTime() - now;
+       if (lockWait > rpcTimeoutNanos) {
+         throw new IOException("Timeout waiting " + 
TimeUnit.NANOSECONDS.toSeconds(lockWait)
+             + " seconds to get tablet lock for " + extent + " " + tid);
        }
  
 -      List<TabletFile> alreadyImported = bulkImported.get(tid);
 +      List<ReferencedTabletFile> alreadyImported = bulkImported.get(tid);
        if (alreadyImported != null) {
 -        for (TabletFile entry : alreadyImported) {
 +        for (ReferencedTabletFile entry : alreadyImported) {
            if (fileMap.remove(entry) != null) {
              log.trace("Ignoring import of bulk file already imported: {}", 
entry);
            }

Reply via email to