This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new e62ff01808 Make bulk import wait for in-progress bulk imports (#4059) e62ff01808 is described below commit e62ff01808c1641ba6d67ca526d834db9dae38da Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Wed Dec 13 13:20:06 2023 -0500 Make bulk import wait for in-progress bulk imports (#4059) * Make bulk import wait for in-progress bulk imports --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../org/apache/accumulo/tserver/tablet/Tablet.java | 45 ++++++++++++++-------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 3f7eebe95f..6a646c4f57 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1744,18 +1744,41 @@ public class Tablet extends TabletBase { // Clients timeout and will think that this operation failed. // Don't do it if we spent too long waiting for the lock - long now = System.currentTimeMillis(); + long now = System.nanoTime(); synchronized (this) { if (isClosed()) { throw new IOException("tablet " + extent + " is closed"); } - // TODO check seems unneeded now - ACCUMULO-1291 - long lockWait = System.currentTimeMillis() - now; - if (lockWait - > getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)) { - throw new IOException( - "Timeout waiting " + (lockWait / 1000.) + " seconds to get tablet lock for " + extent); + long rpcTimeoutNanos = TimeUnit.MILLISECONDS.toNanos( + (long) (getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT) + * 1.1)); + + // wait for any files that are bulk importing up to the RPC timeout limit + while (!Collections.disjoint(bulkImporting, fileMap.keySet())) { + try { + wait(1_000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + + long lockWait = System.nanoTime() - now; + if (lockWait > rpcTimeoutNanos) { + throw new IOException("Timeout waiting " + TimeUnit.NANOSECONDS.toSeconds(lockWait) + + " seconds to get tablet lock for " + extent + " " + tid); + } + } + + // need to check this again because when wait is called above the lock is released. + if (isClosed()) { + throw new IOException("tablet " + extent + " is closed"); + } + + long lockWait = System.nanoTime() - now; + if (lockWait > rpcTimeoutNanos) { + throw new IOException("Timeout waiting " + TimeUnit.NANOSECONDS.toSeconds(lockWait) + + " seconds to get tablet lock for " + extent + " " + tid); } List<TabletFile> alreadyImported = bulkImported.get(tid); @@ -1767,14 +1790,6 @@ public class Tablet extends TabletBase { } } - fileMap.keySet().removeIf(file -> { - if (bulkImporting.contains(file)) { - log.info("Ignoring import of bulk file currently importing: " + file); - return true; - } - return false; - }); - if (fileMap.isEmpty()) { return; }