This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new e62ff01808 Make bulk import wait for in-progress bulk imports (#4059)
e62ff01808 is described below

commit e62ff01808c1641ba6d67ca526d834db9dae38da
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Wed Dec 13 13:20:06 2023 -0500

    Make bulk import wait for in-progress bulk imports (#4059)
    
    * Make bulk import wait for in-progress bulk imports
    
    ---------
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../org/apache/accumulo/tserver/tablet/Tablet.java | 45 ++++++++++++++--------
 1 file changed, 30 insertions(+), 15 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 3f7eebe95f..6a646c4f57 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1744,18 +1744,41 @@ public class Tablet extends TabletBase {
 
     // Clients timeout and will think that this operation failed.
     // Don't do it if we spent too long waiting for the lock
-    long now = System.currentTimeMillis();
+    long now = System.nanoTime();
     synchronized (this) {
       if (isClosed()) {
         throw new IOException("tablet " + extent + " is closed");
       }
 
-      // TODO check seems unneeded now - ACCUMULO-1291
-      long lockWait = System.currentTimeMillis() - now;
-      if (lockWait
-          > 
getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
 {
-        throw new IOException(
-            "Timeout waiting " + (lockWait / 1000.) + " seconds to get tablet 
lock for " + extent);
+      long rpcTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(
+          (long) 
(getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)
+              * 1.1));
+
+      // wait for any files that are bulk importing up to the RPC timeout limit
+      while (!Collections.disjoint(bulkImporting, fileMap.keySet())) {
+        try {
+          wait(1_000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        }
+
+        long lockWait = System.nanoTime() - now;
+        if (lockWait > rpcTimeoutNanos) {
+          throw new IOException("Timeout waiting " + 
TimeUnit.NANOSECONDS.toSeconds(lockWait)
+              + " seconds to get tablet lock for " + extent + " " + tid);
+        }
+      }
+
+      // need to check this again because when wait is called above the lock 
is released.
+      if (isClosed()) {
+        throw new IOException("tablet " + extent + " is closed");
+      }
+
+      long lockWait = System.nanoTime() - now;
+      if (lockWait > rpcTimeoutNanos) {
+        throw new IOException("Timeout waiting " + 
TimeUnit.NANOSECONDS.toSeconds(lockWait)
+            + " seconds to get tablet lock for " + extent + " " + tid);
       }
 
       List<TabletFile> alreadyImported = bulkImported.get(tid);
@@ -1767,14 +1790,6 @@ public class Tablet extends TabletBase {
         }
       }
 
-      fileMap.keySet().removeIf(file -> {
-        if (bulkImporting.contains(file)) {
-          log.info("Ignoring import of bulk file currently importing: " + 
file);
-          return true;
-        }
-        return false;
-      });
-
       if (fileMap.isEmpty()) {
         return;
       }

Reply via email to