This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 56d49f15a0 improves fetching mulitple extents in ample (#3349)
56d49f15a0 is described below

commit 56d49f15a05db9a46dbceb845918497760601c11
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri May 5 16:00:26 2023 -0400

    improves fetching mulitple extents in ample (#3349)
    
    Its possible to pass a collection of extents to ample and get the
    metadata for those tablets.  However only metadata for the tablets that
    exists in the metadata are returned.  Using this method correctly is
    tricky because it may in rare cases (like concurrent splits and merges)
    only return a subset of what was requested. This commit changes the
    ample interface to add handling for these missing extents by adding a
    Consumer<KeyExtent> to the interface to which missing extents are
    passed.  Now its no longer possible to use this ample functionality
    without considering this rare case.  While making this change one
    place was found where the missing extents were not being considered.
---
 .../core/metadata/schema/TabletsMetadata.java      |  61 ++++++++++---
 .../accumulo/coordinator/CompactionFinalizer.java  |   6 +-
 .../org/apache/accumulo/tserver/ScanServer.java    |   5 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  15 +++-
 .../java/org/apache/accumulo/test/AmpleIT.java     | 100 +++++++++++++++++++++
 5 files changed, 167 insertions(+), 20 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 876e6ec584..2061c0a7b5 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -34,13 +34,17 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
@@ -94,6 +98,8 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
     private final List<Text> families = new ArrayList<>();
     private final List<ColumnFQ> qualifiers = new ArrayList<>();
     private Set<KeyExtent> extentsToFetch = null;
+    private boolean fetchTablets = false;
+    private Optional<Consumer<KeyExtent>> notFoundHandler;
     private Ample.DataLevel level;
     private String table;
     private Range range;
@@ -104,7 +110,6 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
     private TableId tableId;
     private ReadConsistency readConsistency = ReadConsistency.IMMEDIATE;
     private final AccumuloClient _client;
-    private Collection<KeyExtent> extents = null;
 
     Builder(AccumuloClient client) {
       this._client = client;
@@ -112,7 +117,7 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
 
     @Override
     public TabletsMetadata build() {
-      if (extents != null) {
+      if (fetchTablets) {
         // setting multiple extents with forTablets(extents) is mutually 
exclusive with these
         // single-tablet options
         checkState(range == null && table == null && level == null && 
!checkConsistency);
@@ -120,7 +125,8 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
       }
 
       checkState((level == null) != (table == null),
-          "scanTable() cannot be used in conjunction with forLevel(), 
forTable() or forTablet()");
+          "scanTable() cannot be used in conjunction with forLevel(), 
forTable() or forTablet() %s %s",
+          level, table);
       if (level == DataLevel.ROOT) {
         ClientContext ctx = ((ClientContext) _client);
         return new TabletsMetadata(getRootMetadata(ctx, readConsistency));
@@ -132,7 +138,7 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
     private TabletsMetadata buildExtents(AccumuloClient client) {
 
       Map<DataLevel,List<KeyExtent>> groupedExtents =
-          extents.stream().collect(groupingBy(ke -> 
DataLevel.of(ke.tableId())));
+          extentsToFetch.stream().collect(groupingBy(ke -> 
DataLevel.of(ke.tableId())));
 
       List<Iterable<TabletMetadata>> iterables = new ArrayList<>();
 
@@ -181,13 +187,33 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
         }
       }
 
-      return new TabletsMetadata(() -> {
+      if (notFoundHandler.isPresent()) {
+        HashSet<KeyExtent> extentsNotSeen = new HashSet<>(extentsToFetch);
+
+        var tablets = iterables.stream().flatMap(i -> 
StreamSupport.stream(i.spliterator(), false))
+            .filter(tabletMetadata -> 
extentsNotSeen.remove(tabletMetadata.getExtent()))
+            .collect(Collectors.toList());
+
+        extentsNotSeen.forEach(notFoundHandler.orElseThrow());
+
         for (AutoCloseable closable : closables) {
-          closable.close();
+          try {
+            closable.close();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
         }
-      }, () -> iterables.stream().flatMap(i -> 
StreamSupport.stream(i.spliterator(), false))
-          .filter(tabletMetadata -> 
extentsToFetch.contains(tabletMetadata.getExtent()))
-          .iterator());
+
+        return new TabletsMetadata(() -> {}, tablets);
+      } else {
+        return new TabletsMetadata(() -> {
+          for (AutoCloseable closable : closables) {
+            closable.close();
+          }
+        }, () -> iterables.stream().flatMap(i -> 
StreamSupport.stream(i.spliterator(), false))
+            .filter(tabletMetadata -> 
extentsToFetch.contains(tabletMetadata.getExtent()))
+            .iterator());
+      }
 
     }
 
@@ -253,7 +279,7 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
 
     @Override
     public Options checkConsistency() {
-      checkState(extents == null, "Unable to check consistency of 
non-contiguous tablets");
+      checkState(!fetchTablets, "Unable to check consistency of non-contiguous 
tablets");
       this.checkConsistency = true;
       return this;
     }
@@ -343,10 +369,12 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
     }
 
     @Override
-    public Options forTablets(Collection<KeyExtent> extents) {
+    public Options forTablets(Collection<KeyExtent> extents,
+        Optional<Consumer<KeyExtent>> notFoundHandler) {
       this.level = null;
-      this.extents = extents;
       this.extentsToFetch = Set.copyOf(extents);
+      this.notFoundHandler = Objects.requireNonNull(notFoundHandler);
+      this.fetchTablets = true;
       return this;
     }
 
@@ -438,8 +466,15 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
     /**
      * Get the tablet metadata for the given extents. This will only return 
tablets where the end
      * row and prev end row exactly match the given extents.
+     *
+     * @param notFoundConsumer if a consumer is present, the extents that do 
not exists in the
+     *        metadata store are passed to the consumer. If the missing 
extents are not needed, then
+     *        pass Optional.empty() and it will be more efficient. Computing 
the missing extents
+     *        requires buffering all tablet metadata in memory before 
returning anything, when
+     *        Optional.empty() is passed this buffering is not done.
      */
-    Options forTablets(Collection<KeyExtent> extents);
+    Options forTablets(Collection<KeyExtent> extents,
+        Optional<Consumer<KeyExtent>> notFoundConsumer);
 
     /**
      * This method automatically determines where the metadata for the passed 
in table ID resides.
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 9bbf2cfb88..4176807caf 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
@@ -155,8 +156,9 @@ public class CompactionFinalizer {
 
         Map<KeyExtent,TabletMetadata> tabletsMetadata;
         var extents = 
batch.stream().map(ExternalCompactionFinalState::getExtent).collect(toList());
-        try (TabletsMetadata tablets = 
context.getAmple().readTablets().forTablets(extents)
-            .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW, 
ColumnType.ECOMP).build()) {
+        try (TabletsMetadata tablets =
+            context.getAmple().readTablets().forTablets(extents, 
Optional.empty())
+                .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW, 
ColumnType.ECOMP).build()) {
           tabletsMetadata = 
tablets.stream().collect(toMap(TabletMetadata::getExtent, identity()));
         }
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index fa863394af..0648c18d98 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -149,8 +150,8 @@ public class ScanServer extends AbstractServer
         loadAll(Set<? extends KeyExtent> keys) {
       long t1 = System.currentTimeMillis();
       @SuppressWarnings("unchecked")
-      var tms = ample.readTablets().forTablets((Collection<KeyExtent>) 
keys).build().stream()
-          .collect(Collectors.toMap(tm -> tm.getExtent(), tm -> tm));
+      var tms = ample.readTablets().forTablets((Collection<KeyExtent>) keys, 
Optional.empty())
+          .build().stream().collect(Collectors.toMap(tm -> tm.getExtent(), tm 
-> tm));
       long t2 = System.currentTimeMillis();
       LOG.trace("Read metadata for {} tablets in {} ms", keys.size(), t2 - t1);
       return tms;
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 937ca24e24..dcdaa4c266 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -46,6 +46,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
@@ -777,10 +778,11 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
       Duration duration;
       Span mdScanSpan = TraceUtil.startSpan(this.getClass(), "metadataScan");
       try (Scope scope = mdScanSpan.makeCurrent()) {
+        List<KeyExtent> missingTablets = new ArrayList<>();
         // gather metadata for all tablets readTablets()
-        try (TabletsMetadata tabletsMetadata =
-            
getContext().getAmple().readTablets().forTablets(onlineTabletsSnapshot.keySet())
-                .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
+        try (TabletsMetadata tabletsMetadata = 
getContext().getAmple().readTablets()
+            .forTablets(onlineTabletsSnapshot.keySet(), 
Optional.of(missingTablets::add))
+            .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
           duration = Duration.between(start, Instant.now());
           log.debug("Metadata scan took {}ms for {} tablets read.", 
duration.toMillis(),
               onlineTabletsSnapshot.keySet().size());
@@ -792,6 +794,13 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
             MetadataUpdateCount counter = updateCounts.get(extent);
             tablet.compareTabletInfo(counter, tabletMetadata);
           }
+
+          for (var extent : missingTablets) {
+            Tablet tablet = onlineTabletsSnapshot.get(extent);
+            if (!tablet.isClosed()) {
+              log.error("Tablet {} is open but does not exist in metadata 
table.", extent);
+            }
+          }
         }
       } catch (Exception e) {
         log.error("Unable to complete verification of tablet metadata", e);
diff --git a/test/src/main/java/org/apache/accumulo/test/AmpleIT.java 
b/test/src/main/java/org/apache/accumulo/test/AmpleIT.java
new file mode 100644
index 0000000000..fb78b7fad8
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/AmpleIT.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static java.util.stream.Collectors.toSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Sets;
+
+public class AmpleIT extends AccumuloClusterHarness {
+
+  private void runFetchTest(Ample ample, List<KeyExtent> extentsToFetch, 
Set<KeyExtent> expected,
+      Set<KeyExtent> expectMissing) {
+    // always run a test without a consumer for not seen tablets as this takes 
a different code path
+    var extentsSeen = ample.readTablets().forTablets(extentsToFetch, 
Optional.empty()).build()
+        .stream().map(TabletMetadata::getExtent).collect(toSet());
+    assertEquals(expected, extentsSeen);
+
+    HashSet<KeyExtent> extentsNotSeen = new HashSet<>();
+    extentsSeen = ample.readTablets().forTablets(extentsToFetch, 
Optional.of(extentsNotSeen::add))
+        .build().stream().map(TabletMetadata::getExtent).collect(toSet());
+    assertEquals(expected, extentsSeen);
+    assertEquals(expectMissing, extentsNotSeen);
+  }
+
+  @Test
+  public void testFetchMultipleExtents() throws Exception {
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"), new 
Text("f"), new Text("v")));
+      NewTableConfiguration ntc = new 
NewTableConfiguration().withSplits(splits);
+      c.tableOperations().create(table, ntc);
+
+      var tableId = TableId.of(c.tableOperations().tableIdMap().get(table));
+
+      // extents that exist in the metadata table
+      KeyExtent ke1 = new KeyExtent(tableId, new Text("c"), null);
+      KeyExtent ke2 = new KeyExtent(tableId, new Text("f"), new Text("c"));
+      KeyExtent ke3 = new KeyExtent(tableId, new Text("v"), new Text("f"));
+      KeyExtent ke4 = new KeyExtent(tableId, null, new Text("v"));
+
+      // extents that do not exist in the metadata table
+      KeyExtent ne1 = new KeyExtent(tableId, null, new Text("g"));
+      KeyExtent ne2 = new KeyExtent(tableId, new Text("e"), new Text("c"));
+      KeyExtent ne3 = new KeyExtent(tableId, new Text("b"), null);
+      KeyExtent ne4 = new KeyExtent(TableId.of(tableId.canonical() + "not"), 
new Text("c"), null);
+
+      var ample = getServerContext().getAmple();
+
+      var toFetch = new ArrayList<KeyExtent>();
+
+      for (var existing : Sets.powerSet(Set.of(ke1, ke2, ke3, ke4))) {
+        for (var nonexisting : Sets.powerSet(Set.of(ne1, ne2, ne3, ne4))) {
+          toFetch.clear();
+          toFetch.addAll(existing);
+          toFetch.addAll(nonexisting);
+
+          // run test to ensure when ample fetches multiple extents it handles 
one that do exist in
+          // the metadata table and those that do not
+          runFetchTest(ample, toFetch, existing, nonexisting);
+        }
+      }
+    }
+  }
+}

Reply via email to