This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new c21766f0f4 Modified ScanServer to correctly handle tablet failures (#3150) c21766f0f4 is described below commit c21766f0f40d9e4e9e0334736fcfef7b5069847f Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Jan 17 10:58:23 2023 -0500 Modified ScanServer to correctly handle tablet failures (#3150) The ScanServer was throwing a NotServingTabletException in both the scan and batch scan cases when there was a failure in loading the tablet. Modified the batch scan case to return the failed tablets in the response instead of throwing the exception. Closes #3144 --- .../org/apache/accumulo/tserver/ScanServer.java | 72 ++++++++++++++---- .../apache/accumulo/tserver/ScanServerTest.java | 86 +++++++++++++++++++--- 2 files changed, 133 insertions(+), 25 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index bd45018f8d..bdbbc541a1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -444,16 +444,22 @@ public class ScanServer extends AbstractServer private final Collection<StoredTabletFile> files; private final long myReservationId; private final Map<KeyExtent,TabletMetadata> tabletsMetadata; + private final Map<TKeyExtent,List<TRange>> failures; - ScanReservation(Map<KeyExtent,TabletMetadata> tabletsMetadata, long myReservationId) { + /* This constructor is called when starting a scan */ + ScanReservation(Map<KeyExtent,TabletMetadata> tabletsMetadata, long myReservationId, + Map<TKeyExtent,List<TRange>> failures) { this.tabletsMetadata = tabletsMetadata; + this.failures = failures; this.files = tabletsMetadata.values().stream().flatMap(tm -> tm.getFiles().stream()) .collect(Collectors.toUnmodifiableSet()); this.myReservationId = myReservationId; } + /* This constructor is called when continuing a scan */ ScanReservation(Collection<StoredTabletFile> files, long myReservationId) { this.tabletsMetadata = null; + this.failures = null; this.files = files; this.myReservationId = myReservationId; } @@ -462,6 +468,14 @@ public class ScanServer extends AbstractServer return tabletsMetadata.get(extent); } + public Set<KeyExtent> getTabletMetadataExtents() { + return tabletsMetadata.keySet(); + } + + public Map<TKeyExtent,List<TRange>> getFailures() { + return this.failures; + } + SnapshotTablet newTablet(ScanServer server, KeyExtent extent) throws IOException { var tabletMetadata = getTabletMetadata(extent); TabletResourceManager trm = @@ -488,8 +502,12 @@ public class ScanServer extends AbstractServer } } + /* + * All extents passed in should end up in either the returned map or the failures set, but no + * extent should be in both. + */ private Map<KeyExtent,TabletMetadata> reserveFilesInner(Collection<KeyExtent> extents, - long myReservationId) throws NotServingTabletException, AccumuloException { + long myReservationId, Set<KeyExtent> failures) throws AccumuloException { // RFS is an acronym for Reference files for scan LOG.debug("RFFS {} ensuring files are referenced for scan of extents {}", myReservationId, extents); @@ -500,13 +518,14 @@ public class ScanServer extends AbstractServer var tabletMetadata = tabletsMetadata.get(extent); if (tabletMetadata == null) { LOG.info("RFFS {} extent not found in metadata table {}", myReservationId, extent); - throw new NotServingTabletException(extent.toThrift()); + failures.add(extent); } if (!AssignmentHandler.checkTabletMetadata(extent, null, tabletMetadata, true)) { LOG.info("RFFS {} extent unable to load {} as AssignmentHandler returned false", myReservationId, extent); - throw new NotServingTabletException(extent.toThrift()); + failures.add(extent); + tabletsMetadata.remove(extent); } } @@ -592,11 +611,13 @@ public class ScanServer extends AbstractServer getContext().getAmple().deleteScanServerFileReferences(refs); LOG.info("RFFS {} extent unable to load {} as metadata no longer referencing files", myReservationId, extent); - throw new NotServingTabletException(extent.toThrift()); + failures.add(extent); + tabletsMetadata.remove(extent); + } else { + // remove files that are still referenced + filesToReserve.removeAll(metadataAfter.getFiles()); } - // remove files that are still referenced - filesToReserve.removeAll(metadataAfter.getFiles()); } // if this is not empty it means some files that we reserved are no longer referenced by @@ -633,17 +654,34 @@ public class ScanServer extends AbstractServer } } - protected ScanReservation reserveFiles(Collection<KeyExtent> extents) - throws NotServingTabletException, AccumuloException { + protected ScanReservation reserveFiles(Map<KeyExtent,List<TRange>> extents) + throws AccumuloException { long myReservationId = nextScanReservationId.incrementAndGet(); - Map<KeyExtent,TabletMetadata> tabletsMetadata = reserveFilesInner(extents, myReservationId); + Set<KeyExtent> failedReservations = new HashSet<>(); + Map<KeyExtent,TabletMetadata> tabletsMetadata = + reserveFilesInner(extents.keySet(), myReservationId, failedReservations); while (tabletsMetadata == null) { - tabletsMetadata = reserveFilesInner(extents, myReservationId); + failedReservations.clear(); + tabletsMetadata = reserveFilesInner(extents.keySet(), myReservationId, failedReservations); + } + + // validate that the tablet metadata set and failure set are disjoint and that the + // tablet metadata set and failure set contain all of the extents + if (!Collections.disjoint(tabletsMetadata.keySet(), failedReservations) + || !extents.keySet().equals(Sets.union(tabletsMetadata.keySet(), failedReservations))) { + throw new IllegalStateException("bug in reserverFilesInner " + extents.keySet() + "," + + tabletsMetadata.keySet() + "," + failedReservations); } - return new ScanReservation(tabletsMetadata, myReservationId); + // Convert failures + Map<TKeyExtent,List<TRange>> failures = new HashMap<>(); + failedReservations.forEach(extent -> { + failures.put(extent.toThrift(), extents.get(extent)); + }); + + return new ScanReservation(tabletsMetadata, myReservationId, failures); } protected ScanReservation reserveFiles(long scanId) throws NoSuchScanIDException { @@ -827,8 +865,12 @@ public class ScanServer extends AbstractServer TooManyFilesException, TSampleNotPresentException, TException { KeyExtent extent = getKeyExtent(textent); + try (ScanReservation reservation = + reserveFiles(Map.of(extent, Collections.singletonList(range)))) { - try (ScanReservation reservation = reserveFiles(Collections.singleton(extent))) { + if (reservation.getFailures().containsKey(textent)) { + throw new NotServingTabletException(extent.toThrift()); + } TabletBase tablet = reservation.newTablet(this, extent); @@ -881,10 +923,10 @@ public class ScanServer extends AbstractServer batch.put(extent, entry.getValue()); } - try (ScanReservation reservation = reserveFiles(batch.keySet())) { + try (ScanReservation reservation = reserveFiles(batch)) { HashMap<KeyExtent,TabletBase> tablets = new HashMap<>(); - batch.keySet().forEach(extent -> { + reservation.getTabletMetadataExtents().forEach(extent -> { try { tablets.put(extent, reservation.newTablet(this, extent)); } catch (IOException e) { diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java index fdf9bbf439..a1564e7766 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java @@ -28,10 +28,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.clientImpl.thrift.TInfo; @@ -65,7 +65,6 @@ public class ScanServerTest { private KeyExtent extent; private TabletResolver resolver; private ScanReservation reservation; - private boolean loadTabletFailure = false; protected TestScanServer(ScanServerOpts opts, String[] args) { super(opts, args); @@ -93,11 +92,8 @@ public class ScanServerTest { } @Override - protected ScanReservation reserveFiles(Collection<KeyExtent> extents) - throws NotServingTabletException, AccumuloException { - if (loadTabletFailure) { - throw new NotServingTabletException(); - } + protected ScanReservation reserveFiles(Map<KeyExtent,List<TRange>> extents) + throws AccumuloException { return reservation; } @@ -130,6 +126,7 @@ public class ScanServerTest { TabletResolver resolver = createMock(TabletResolver.class); TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); + expect(reservation.getFailures()).andReturn(Map.of()); expect(reservation.newTablet(ss, sextent)).andReturn(tablet); reservation.close(); reservation.close(); @@ -157,13 +154,15 @@ public class ScanServerTest { } @Test - public void testTabletLoadFailure() throws Exception { + public void testScanTabletLoadFailure() throws Exception { handler = createMock(ThriftScanClientHandler.class); TInfo tinfo = createMock(TInfo.class); TCredentials tcreds = createMock(TCredentials.class); + KeyExtent extent = createMock(KeyExtent.class); TKeyExtent textent = createMock(TKeyExtent.class); TRange trange = createMock(TRange.class); + List<TRange> ranges = new ArrayList<>(); List<TColumn> tcols = new ArrayList<>(); List<IterInfo> titer = new ArrayList<>(); Map<String,Map<String,String>> ssio = new HashMap<>(); @@ -171,18 +170,22 @@ public class ScanServerTest { TSamplerConfiguration tsc = createMock(TSamplerConfiguration.class); String classLoaderContext = new String(); Map<String,String> execHints = new HashMap<>(); + ScanReservation reservation = createMock(ScanReservation.class); expect(handler.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, auths, false, false, 10, tsc, 30L, classLoaderContext, execHints, 0L)) .andReturn(new InitialScan(15, null)); + expect(reservation.getFailures()).andReturn(Map.of(textent, ranges)); + reservation.close(); expect(handler.continueScan(tinfo, 15, 0L)).andReturn(new ScanResult()); handler.closeScan(tinfo, 15); - replay(handler); + replay(handler, reservation); TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); + ss.extent = extent; ss.delegate = handler; - ss.loadTabletFailure = true; + ss.reservation = reservation; assertThrows(NotServingTabletException.class, () -> { ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, auths, false, false, 10, @@ -222,6 +225,8 @@ public class ScanServerTest { TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); expect(reservation.newTablet(ss, extent)).andReturn(tablet); + expect(reservation.getTabletMetadataExtents()).andReturn(Set.of(extent)); + expect(reservation.getFailures()).andReturn(Map.of()); reservation.close(); reservation.close(); expect(handler.startMultiScan(tinfo, tcreds, tcols, titer, batch, ssio, auths, false, tsc, 30L, @@ -248,6 +253,67 @@ public class ScanServerTest { verify(handler); } + @Test + public void testBatchScanTabletLoadFailure() throws Exception { + handler = createMock(ThriftScanClientHandler.class); + + TInfo tinfo = createMock(TInfo.class); + TCredentials tcreds = createMock(TCredentials.class); + List<TRange> ranges = new ArrayList<>(); + KeyExtent extent = createMock(KeyExtent.class); + TKeyExtent textent = createMock(TKeyExtent.class); + ScanReservation reservation = createMock(ScanReservation.class); + SnapshotTablet tablet = createMock(SnapshotTablet.class); + Map<KeyExtent,List<TRange>> batch = new HashMap<>(); + batch.put(extent, ranges); + List<TColumn> tcols = new ArrayList<>(); + List<IterInfo> titer = new ArrayList<>(); + Map<String,Map<String,String>> ssio = new HashMap<>(); + List<ByteBuffer> auths = new ArrayList<>(); + TSamplerConfiguration tsc = createMock(TSamplerConfiguration.class); + String classLoaderContext = new String(); + Map<String,String> execHints = new HashMap<>(); + Map<KeyExtent,TabletBase> tablets = new HashMap<>(); + TabletResolver resolver = new TabletResolver() { + @Override + public TabletBase getTablet(KeyExtent extent) { + return tablets.get(extent); + } + + @Override + public void close() {} + }; + + TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); + expect(reservation.newTablet(ss, extent)).andReturn(tablet); + expect(reservation.getTabletMetadataExtents()).andReturn(Set.of()); + expect(reservation.getFailures()).andReturn(Map.of(textent, ranges)).anyTimes(); + reservation.close(); + reservation.close(); + InitialMultiScan ims = new InitialMultiScan(15, null); + ims.setResult(new MultiScanResult()); + expect(handler.startMultiScan(tinfo, tcreds, tcols, titer, batch, ssio, auths, false, tsc, 30L, + classLoaderContext, execHints, resolver, 0L)).andReturn(ims); + expect(handler.continueMultiScan(tinfo, 15, 0L)).andReturn(new MultiScanResult()); + handler.closeMultiScan(tinfo, 15); + + replay(reservation, handler); + + ss.delegate = handler; + ss.extent = extent; + ss.resolver = resolver; + ss.reservation = reservation; + ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); + + Map<TKeyExtent,List<TRange>> extents = new HashMap<>(); + extents.put(textent, ranges); + InitialMultiScan is = ss.startMultiScan(tinfo, tcreds, extents, tcols, titer, ssio, auths, + false, tsc, 30L, classLoaderContext, execHints, 0L); + assertEquals(15, is.getScanID()); + assertEquals(0, is.getResult().getFailuresSize()); + + } + @Test public void testBatchScanNoRanges() throws Exception { handler = createMock(ThriftScanClientHandler.class);