This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new c21766f0f4 Modified ScanServer to correctly handle tablet failures 
(#3150)
c21766f0f4 is described below

commit c21766f0f40d9e4e9e0334736fcfef7b5069847f
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Jan 17 10:58:23 2023 -0500

    Modified ScanServer to correctly handle tablet failures (#3150)
    
    The ScanServer was throwing a NotServingTabletException in both the
    scan and batch scan cases when there was a failure in loading the
    tablet. Modified the batch scan case to return the failed tablets
    in the response instead of throwing the exception.
    
    Closes #3144
---
 .../org/apache/accumulo/tserver/ScanServer.java    | 72 ++++++++++++++----
 .../apache/accumulo/tserver/ScanServerTest.java    | 86 +++++++++++++++++++---
 2 files changed, 133 insertions(+), 25 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index bd45018f8d..bdbbc541a1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -444,16 +444,22 @@ public class ScanServer extends AbstractServer
     private final Collection<StoredTabletFile> files;
     private final long myReservationId;
     private final Map<KeyExtent,TabletMetadata> tabletsMetadata;
+    private final Map<TKeyExtent,List<TRange>> failures;
 
-    ScanReservation(Map<KeyExtent,TabletMetadata> tabletsMetadata, long 
myReservationId) {
+    /* This constructor is called when starting a scan */
+    ScanReservation(Map<KeyExtent,TabletMetadata> tabletsMetadata, long 
myReservationId,
+        Map<TKeyExtent,List<TRange>> failures) {
       this.tabletsMetadata = tabletsMetadata;
+      this.failures = failures;
       this.files = tabletsMetadata.values().stream().flatMap(tm -> 
tm.getFiles().stream())
           .collect(Collectors.toUnmodifiableSet());
       this.myReservationId = myReservationId;
     }
 
+    /* This constructor is called when continuing a scan */
     ScanReservation(Collection<StoredTabletFile> files, long myReservationId) {
       this.tabletsMetadata = null;
+      this.failures = null;
       this.files = files;
       this.myReservationId = myReservationId;
     }
@@ -462,6 +468,14 @@ public class ScanServer extends AbstractServer
       return tabletsMetadata.get(extent);
     }
 
+    public Set<KeyExtent> getTabletMetadataExtents() {
+      return tabletsMetadata.keySet();
+    }
+
+    public Map<TKeyExtent,List<TRange>> getFailures() {
+      return this.failures;
+    }
+
     SnapshotTablet newTablet(ScanServer server, KeyExtent extent) throws 
IOException {
       var tabletMetadata = getTabletMetadata(extent);
       TabletResourceManager trm =
@@ -488,8 +502,12 @@ public class ScanServer extends AbstractServer
     }
   }
 
+  /*
+   * All extents passed in should end up in either the returned map or the 
failures set, but no
+   * extent should be in both.
+   */
   private Map<KeyExtent,TabletMetadata> 
reserveFilesInner(Collection<KeyExtent> extents,
-      long myReservationId) throws NotServingTabletException, 
AccumuloException {
+      long myReservationId, Set<KeyExtent> failures) throws AccumuloException {
     // RFS is an acronym for Reference files for scan
     LOG.debug("RFFS {} ensuring files are referenced for scan of extents {}", 
myReservationId,
         extents);
@@ -500,13 +518,14 @@ public class ScanServer extends AbstractServer
       var tabletMetadata = tabletsMetadata.get(extent);
       if (tabletMetadata == null) {
         LOG.info("RFFS {} extent not found in metadata table {}", 
myReservationId, extent);
-        throw new NotServingTabletException(extent.toThrift());
+        failures.add(extent);
       }
 
       if (!AssignmentHandler.checkTabletMetadata(extent, null, tabletMetadata, 
true)) {
         LOG.info("RFFS {} extent unable to load {} as AssignmentHandler 
returned false",
             myReservationId, extent);
-        throw new NotServingTabletException(extent.toThrift());
+        failures.add(extent);
+        tabletsMetadata.remove(extent);
       }
     }
 
@@ -592,11 +611,13 @@ public class ScanServer extends AbstractServer
             getContext().getAmple().deleteScanServerFileReferences(refs);
             LOG.info("RFFS {} extent unable to load {} as metadata no longer 
referencing files",
                 myReservationId, extent);
-            throw new NotServingTabletException(extent.toThrift());
+            failures.add(extent);
+            tabletsMetadata.remove(extent);
+          } else {
+            // remove files that are still referenced
+            filesToReserve.removeAll(metadataAfter.getFiles());
           }
 
-          // remove files that are still referenced
-          filesToReserve.removeAll(metadataAfter.getFiles());
         }
 
         // if this is not empty it means some files that we reserved are no 
longer referenced by
@@ -633,17 +654,34 @@ public class ScanServer extends AbstractServer
     }
   }
 
-  protected ScanReservation reserveFiles(Collection<KeyExtent> extents)
-      throws NotServingTabletException, AccumuloException {
+  protected ScanReservation reserveFiles(Map<KeyExtent,List<TRange>> extents)
+      throws AccumuloException {
 
     long myReservationId = nextScanReservationId.incrementAndGet();
 
-    Map<KeyExtent,TabletMetadata> tabletsMetadata = reserveFilesInner(extents, 
myReservationId);
+    Set<KeyExtent> failedReservations = new HashSet<>();
+    Map<KeyExtent,TabletMetadata> tabletsMetadata =
+        reserveFilesInner(extents.keySet(), myReservationId, 
failedReservations);
     while (tabletsMetadata == null) {
-      tabletsMetadata = reserveFilesInner(extents, myReservationId);
+      failedReservations.clear();
+      tabletsMetadata = reserveFilesInner(extents.keySet(), myReservationId, 
failedReservations);
+    }
+
+    // validate that the tablet metadata set and failure set are disjoint and 
that the
+    // tablet metadata set and failure set contain all of the extents
+    if (!Collections.disjoint(tabletsMetadata.keySet(), failedReservations)
+        || !extents.keySet().equals(Sets.union(tabletsMetadata.keySet(), 
failedReservations))) {
+      throw new IllegalStateException("bug in reserverFilesInner " + 
extents.keySet() + ","
+          + tabletsMetadata.keySet() + "," + failedReservations);
     }
 
-    return new ScanReservation(tabletsMetadata, myReservationId);
+    // Convert failures
+    Map<TKeyExtent,List<TRange>> failures = new HashMap<>();
+    failedReservations.forEach(extent -> {
+      failures.put(extent.toThrift(), extents.get(extent));
+    });
+
+    return new ScanReservation(tabletsMetadata, myReservationId, failures);
   }
 
   protected ScanReservation reserveFiles(long scanId) throws 
NoSuchScanIDException {
@@ -827,8 +865,12 @@ public class ScanServer extends AbstractServer
       TooManyFilesException, TSampleNotPresentException, TException {
 
     KeyExtent extent = getKeyExtent(textent);
+    try (ScanReservation reservation =
+        reserveFiles(Map.of(extent, Collections.singletonList(range)))) {
 
-    try (ScanReservation reservation = 
reserveFiles(Collections.singleton(extent))) {
+      if (reservation.getFailures().containsKey(textent)) {
+        throw new NotServingTabletException(extent.toThrift());
+      }
 
       TabletBase tablet = reservation.newTablet(this, extent);
 
@@ -881,10 +923,10 @@ public class ScanServer extends AbstractServer
       batch.put(extent, entry.getValue());
     }
 
-    try (ScanReservation reservation = reserveFiles(batch.keySet())) {
+    try (ScanReservation reservation = reserveFiles(batch)) {
 
       HashMap<KeyExtent,TabletBase> tablets = new HashMap<>();
-      batch.keySet().forEach(extent -> {
+      reservation.getTabletMetadataExtents().forEach(extent -> {
         try {
           tablets.put(extent, reservation.newTablet(this, extent));
         } catch (IOException e) {
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
index fdf9bbf439..a1564e7766 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
@@ -28,10 +28,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.clientImpl.thrift.TInfo;
@@ -65,7 +65,6 @@ public class ScanServerTest {
     private KeyExtent extent;
     private TabletResolver resolver;
     private ScanReservation reservation;
-    private boolean loadTabletFailure = false;
 
     protected TestScanServer(ScanServerOpts opts, String[] args) {
       super(opts, args);
@@ -93,11 +92,8 @@ public class ScanServerTest {
     }
 
     @Override
-    protected ScanReservation reserveFiles(Collection<KeyExtent> extents)
-        throws NotServingTabletException, AccumuloException {
-      if (loadTabletFailure) {
-        throw new NotServingTabletException();
-      }
+    protected ScanReservation reserveFiles(Map<KeyExtent,List<TRange>> extents)
+        throws AccumuloException {
       return reservation;
     }
 
@@ -130,6 +126,7 @@ public class ScanServerTest {
     TabletResolver resolver = createMock(TabletResolver.class);
 
     TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
+    expect(reservation.getFailures()).andReturn(Map.of());
     expect(reservation.newTablet(ss, sextent)).andReturn(tablet);
     reservation.close();
     reservation.close();
@@ -157,13 +154,15 @@ public class ScanServerTest {
   }
 
   @Test
-  public void testTabletLoadFailure() throws Exception {
+  public void testScanTabletLoadFailure() throws Exception {
     handler = createMock(ThriftScanClientHandler.class);
 
     TInfo tinfo = createMock(TInfo.class);
     TCredentials tcreds = createMock(TCredentials.class);
+    KeyExtent extent = createMock(KeyExtent.class);
     TKeyExtent textent = createMock(TKeyExtent.class);
     TRange trange = createMock(TRange.class);
+    List<TRange> ranges = new ArrayList<>();
     List<TColumn> tcols = new ArrayList<>();
     List<IterInfo> titer = new ArrayList<>();
     Map<String,Map<String,String>> ssio = new HashMap<>();
@@ -171,18 +170,22 @@ public class ScanServerTest {
     TSamplerConfiguration tsc = createMock(TSamplerConfiguration.class);
     String classLoaderContext = new String();
     Map<String,String> execHints = new HashMap<>();
+    ScanReservation reservation = createMock(ScanReservation.class);
 
     expect(handler.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, 
ssio, auths, false,
         false, 10, tsc, 30L, classLoaderContext, execHints, 0L))
         .andReturn(new InitialScan(15, null));
+    expect(reservation.getFailures()).andReturn(Map.of(textent, ranges));
+    reservation.close();
     expect(handler.continueScan(tinfo, 15, 0L)).andReturn(new ScanResult());
     handler.closeScan(tinfo, 15);
 
-    replay(handler);
+    replay(handler, reservation);
 
     TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
+    ss.extent = extent;
     ss.delegate = handler;
-    ss.loadTabletFailure = true;
+    ss.reservation = reservation;
 
     assertThrows(NotServingTabletException.class, () -> {
       ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, 
auths, false, false, 10,
@@ -222,6 +225,8 @@ public class ScanServerTest {
 
     TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
     expect(reservation.newTablet(ss, extent)).andReturn(tablet);
+    expect(reservation.getTabletMetadataExtents()).andReturn(Set.of(extent));
+    expect(reservation.getFailures()).andReturn(Map.of());
     reservation.close();
     reservation.close();
     expect(handler.startMultiScan(tinfo, tcreds, tcols, titer, batch, ssio, 
auths, false, tsc, 30L,
@@ -248,6 +253,67 @@ public class ScanServerTest {
     verify(handler);
   }
 
+  @Test
+  public void testBatchScanTabletLoadFailure() throws Exception {
+    handler = createMock(ThriftScanClientHandler.class);
+
+    TInfo tinfo = createMock(TInfo.class);
+    TCredentials tcreds = createMock(TCredentials.class);
+    List<TRange> ranges = new ArrayList<>();
+    KeyExtent extent = createMock(KeyExtent.class);
+    TKeyExtent textent = createMock(TKeyExtent.class);
+    ScanReservation reservation = createMock(ScanReservation.class);
+    SnapshotTablet tablet = createMock(SnapshotTablet.class);
+    Map<KeyExtent,List<TRange>> batch = new HashMap<>();
+    batch.put(extent, ranges);
+    List<TColumn> tcols = new ArrayList<>();
+    List<IterInfo> titer = new ArrayList<>();
+    Map<String,Map<String,String>> ssio = new HashMap<>();
+    List<ByteBuffer> auths = new ArrayList<>();
+    TSamplerConfiguration tsc = createMock(TSamplerConfiguration.class);
+    String classLoaderContext = new String();
+    Map<String,String> execHints = new HashMap<>();
+    Map<KeyExtent,TabletBase> tablets = new HashMap<>();
+    TabletResolver resolver = new TabletResolver() {
+      @Override
+      public TabletBase getTablet(KeyExtent extent) {
+        return tablets.get(extent);
+      }
+
+      @Override
+      public void close() {}
+    };
+
+    TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
+    expect(reservation.newTablet(ss, extent)).andReturn(tablet);
+    expect(reservation.getTabletMetadataExtents()).andReturn(Set.of());
+    expect(reservation.getFailures()).andReturn(Map.of(textent, 
ranges)).anyTimes();
+    reservation.close();
+    reservation.close();
+    InitialMultiScan ims = new InitialMultiScan(15, null);
+    ims.setResult(new MultiScanResult());
+    expect(handler.startMultiScan(tinfo, tcreds, tcols, titer, batch, ssio, 
auths, false, tsc, 30L,
+        classLoaderContext, execHints, resolver, 0L)).andReturn(ims);
+    expect(handler.continueMultiScan(tinfo, 15, 0L)).andReturn(new 
MultiScanResult());
+    handler.closeMultiScan(tinfo, 15);
+
+    replay(reservation, handler);
+
+    ss.delegate = handler;
+    ss.extent = extent;
+    ss.resolver = resolver;
+    ss.reservation = reservation;
+    ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
+
+    Map<TKeyExtent,List<TRange>> extents = new HashMap<>();
+    extents.put(textent, ranges);
+    InitialMultiScan is = ss.startMultiScan(tinfo, tcreds, extents, tcols, 
titer, ssio, auths,
+        false, tsc, 30L, classLoaderContext, execHints, 0L);
+    assertEquals(15, is.getScanID());
+    assertEquals(0, is.getResult().getFailuresSize());
+
+  }
+
   @Test
   public void testBatchScanNoRanges() throws Exception {
     handler = createMock(ThriftScanClientHandler.class);

Reply via email to