This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 324f52b711 Handles exceptions during close (#3765) 324f52b711 is described below commit 324f52b711db908f144c943bcf1bc077af73e39b Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Tue Oct 17 13:26:04 2023 -0400 Handles exceptions during close (#3765) * Handles exceptions during close * Adds a uniqueID to the scanDataSource object to allow for better tracking. * Adds example of Exception suppressing. * Adds logging statement for missed exceptions Only a particular exception type is converted to a TException. Other exceptions could fail to be replicated to the client. To ensure behavior doesn't change, just log these exceptions at warn for now. --------- Co-authored-by: Ivan Bella <i...@bella.name> Co-authored-by: Keith Turner <ktur...@apache.org> --- .../accumulo/tserver/TabletClientHandler.java | 3 + .../accumulo/tserver/tablet/DatafileManager.java | 53 +++++----- .../accumulo/tserver/tablet/ScanDataSource.java | 108 +++++++++++++-------- 3 files changed, 99 insertions(+), 65 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 509dcffc70..acd0a2f4c3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -968,6 +968,9 @@ public class TabletClientHandler implements TabletClientService.Iface { return results; } catch (IOException ioe) { throw new TException(ioe); + } catch (Exception e) { + log.warn("Exception returned for conditionalUpdate {}", e); + throw e; } finally { writeTracker.finishWrite(opid); server.sessionManager.unreserveSession(sessID); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java index 0320d73b79..b2568f7ade 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java @@ -121,36 +121,43 @@ class DatafileManager { final Set<StoredTabletFile> filesToDelete = new HashSet<>(); - synchronized (tablet) { - Set<StoredTabletFile> absFilePaths = scanFileReservations.remove(reservationId); + try { + synchronized (tablet) { + Set<StoredTabletFile> absFilePaths = scanFileReservations.remove(reservationId); - if (absFilePaths == null) { - throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); - } + if (absFilePaths == null) { + throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); + } - boolean notify = false; - for (StoredTabletFile path : absFilePaths) { - long refCount = fileScanReferenceCounts.decrement(path, 1); - if (refCount == 0) { - if (filesToDeleteAfterScan.remove(path)) { - filesToDelete.add(path); + boolean notify = false; + try { + for (StoredTabletFile path : absFilePaths) { + long refCount = fileScanReferenceCounts.decrement(path, 1); + if (refCount == 0) { + if (filesToDeleteAfterScan.remove(path)) { + filesToDelete.add(path); + } + notify = true; + } else if (refCount < 0) { + throw new IllegalStateException("Scan ref count for " + path + " is " + refCount); + } + } + } finally { + if (notify) { + tablet.notifyAll(); } - notify = true; - } else if (refCount < 0) { - throw new IllegalStateException("Scan ref count for " + path + " is " + refCount); } } - - if (notify) { - tablet.notifyAll(); + } finally { + // Remove scan files even if the loop above did not fully complete because once a + // file is in the set filesToDelete that means it was removed from filesToDeleteAfterScan + // and would never be added back. + if (!filesToDelete.isEmpty()) { + log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), filesToDelete); + MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(), + tablet.getTabletServer().getLock()); } } - - if (!filesToDelete.isEmpty()) { - log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), filesToDelete); - MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(), - tablet.getTabletServer().getLock()); - } } void removeFilesAfterScan(Set<StoredTabletFile> scanFiles) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 51ada87625..f17d9fa57e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -61,6 +62,7 @@ class ScanDataSource implements DataSource { // data source state private final TabletBase tablet; private ScanFileManager fileManager; + private static AtomicLong nextSourceId = new AtomicLong(0); private SortedKeyValueIterator<Key,Value> iter; private long expectedDeletionCount; private List<MemoryIterator> memIters = null; @@ -71,6 +73,7 @@ class ScanDataSource implements DataSource { private final ScanParameters scanParams; private final boolean loadIters; private final byte[] defaultLabels; + private final long scanDataSourceId; ScanDataSource(TabletBase tablet, ScanParameters scanParams, boolean loadIters, AtomicBoolean interruptFlag) { @@ -80,35 +83,38 @@ class ScanDataSource implements DataSource { this.interruptFlag = interruptFlag; this.loadIters = loadIters; this.defaultLabels = tablet.getDefaultSecurityLabels(); - if (log.isTraceEnabled()) { - log.trace("new scan data source, tablet: {}, params: {}, loadIterators: {}", this.tablet, - this.scanParams, this.loadIters); - } + this.scanDataSourceId = nextSourceId.incrementAndGet(); + log.trace("new scan data source, scanId {}, tablet: {}, params: {}, loadIterators: {}", + this.scanDataSourceId, this.tablet, this.scanParams, this.loadIters); } @Override public DataSource getNewDataSource() { - if (isCurrent()) { - return this; - } else { - // log.debug("Switching data sources during a scan"); - if (memIters != null) { - tablet.returnMemIterators(memIters); - memIters = null; - tablet.returnFilesForScan(fileReservationId); - fileReservationId = -1; - } - - if (fileManager != null) { - tablet.getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles()); - fileManager.releaseOpenFiles(false); + if (!isCurrent()) { + Throwable thrownException = null; + try { + returnIterators(); + } catch (Exception e) { + thrownException = e; + throw e; + } finally { + try { + if (fileManager != null) { + tablet.getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles()); + fileManager.releaseOpenFiles(false); + } + } catch (Exception e) { + if (thrownException != null) { + e.addSuppressed(thrownException); + throw e; + } + } finally { + expectedDeletionCount = tablet.getDataSourceDeletions(); + iter = null; + } } - - expectedDeletionCount = tablet.getDataSourceDeletions(); - iter = null; - - return this; } + return this; } @Override @@ -149,6 +155,7 @@ class ScanDataSource implements DataSource { if (fileManager == null) { fileManager = tablet.getTabletResources().newScanFileManager(scanParams.getScanDispatch()); tablet.getScanMetrics().incrementOpenFiles(fileManager.getNumOpenFiles()); + log.trace("Adding active scan for {}, scanId:{}", tablet.getExtent(), scanDataSourceId); tablet.addActiveScans(this); } @@ -234,32 +241,49 @@ class ScanDataSource implements DataSource { } } - @Override - public void close(boolean sawErrors) { - + private void returnIterators() { if (memIters != null) { + log.trace("Returning mem iterators for {}, scanId:{}, fid:{}", tablet.getExtent(), + scanDataSourceId, fileReservationId); tablet.returnMemIterators(memIters); memIters = null; - tablet.returnFilesForScan(fileReservationId); - fileReservationId = -1; - } - - synchronized (tablet) { - if (tablet.removeScan(this) == 0) { - tablet.notifyAll(); + try { + log.trace("Returning file iterators for {}, scanId:{}, fid:{}", tablet.getExtent(), + scanDataSourceId, fileReservationId); + tablet.returnFilesForScan(fileReservationId); + } catch (Exception e) { + log.warn("Error Returning file iterators for scan: {}, :{}", scanDataSourceId, e); + // Continue bubbling the exception up for handling. + throw e; + } finally { + fileReservationId = -1; } } + } - if (fileManager != null) { - tablet.getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles()); - fileManager.releaseOpenFiles(sawErrors); - fileManager = null; - } - - if (statsIterator != null) { - statsIterator.report(); + @Override + public void close(boolean sawErrors) { + try { + returnIterators(); + } finally { + synchronized (tablet) { + log.trace("Removing active scan for {} scanID:{}", tablet.getExtent(), scanDataSourceId); + if (tablet.removeScan(this) == 0) { + tablet.notifyAll(); + } + } + try { + if (fileManager != null) { + tablet.getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles()); + fileManager.releaseOpenFiles(sawErrors); + } + } finally { + fileManager = null; + if (statsIterator != null) { + statsIterator.report(); + } + } } - } public void interrupt() {