This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 56d49f15a0 improves fetching mulitple extents in ample (#3349) 56d49f15a0 is described below commit 56d49f15a05db9a46dbceb845918497760601c11 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri May 5 16:00:26 2023 -0400 improves fetching mulitple extents in ample (#3349) Its possible to pass a collection of extents to ample and get the metadata for those tablets. However only metadata for the tablets that exists in the metadata are returned. Using this method correctly is tricky because it may in rare cases (like concurrent splits and merges) only return a subset of what was requested. This commit changes the ample interface to add handling for these missing extents by adding a Consumer<KeyExtent> to the interface to which missing extents are passed. Now its no longer possible to use this ample functionality without considering this rare case. While making this change one place was found where the missing extents were not being considered. --- .../core/metadata/schema/TabletsMetadata.java | 61 ++++++++++--- .../accumulo/coordinator/CompactionFinalizer.java | 6 +- .../org/apache/accumulo/tserver/ScanServer.java | 5 +- .../org/apache/accumulo/tserver/TabletServer.java | 15 +++- .../java/org/apache/accumulo/test/AmpleIT.java | 100 +++++++++++++++++++++ 5 files changed, 167 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java index 876e6ec584..2061c0a7b5 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java @@ -34,13 +34,17 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -94,6 +98,8 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable private final List<Text> families = new ArrayList<>(); private final List<ColumnFQ> qualifiers = new ArrayList<>(); private Set<KeyExtent> extentsToFetch = null; + private boolean fetchTablets = false; + private Optional<Consumer<KeyExtent>> notFoundHandler; private Ample.DataLevel level; private String table; private Range range; @@ -104,7 +110,6 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable private TableId tableId; private ReadConsistency readConsistency = ReadConsistency.IMMEDIATE; private final AccumuloClient _client; - private Collection<KeyExtent> extents = null; Builder(AccumuloClient client) { this._client = client; @@ -112,7 +117,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable @Override public TabletsMetadata build() { - if (extents != null) { + if (fetchTablets) { // setting multiple extents with forTablets(extents) is mutually exclusive with these // single-tablet options checkState(range == null && table == null && level == null && !checkConsistency); @@ -120,7 +125,8 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable } checkState((level == null) != (table == null), - "scanTable() cannot be used in conjunction with forLevel(), forTable() or forTablet()"); + "scanTable() cannot be used in conjunction with forLevel(), forTable() or forTablet() %s %s", + level, table); if (level == DataLevel.ROOT) { ClientContext ctx = ((ClientContext) _client); return new TabletsMetadata(getRootMetadata(ctx, readConsistency)); @@ -132,7 +138,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable private TabletsMetadata buildExtents(AccumuloClient client) { Map<DataLevel,List<KeyExtent>> groupedExtents = - extents.stream().collect(groupingBy(ke -> DataLevel.of(ke.tableId()))); + extentsToFetch.stream().collect(groupingBy(ke -> DataLevel.of(ke.tableId()))); List<Iterable<TabletMetadata>> iterables = new ArrayList<>(); @@ -181,13 +187,33 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable } } - return new TabletsMetadata(() -> { + if (notFoundHandler.isPresent()) { + HashSet<KeyExtent> extentsNotSeen = new HashSet<>(extentsToFetch); + + var tablets = iterables.stream().flatMap(i -> StreamSupport.stream(i.spliterator(), false)) + .filter(tabletMetadata -> extentsNotSeen.remove(tabletMetadata.getExtent())) + .collect(Collectors.toList()); + + extentsNotSeen.forEach(notFoundHandler.orElseThrow()); + for (AutoCloseable closable : closables) { - closable.close(); + try { + closable.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } } - }, () -> iterables.stream().flatMap(i -> StreamSupport.stream(i.spliterator(), false)) - .filter(tabletMetadata -> extentsToFetch.contains(tabletMetadata.getExtent())) - .iterator()); + + return new TabletsMetadata(() -> {}, tablets); + } else { + return new TabletsMetadata(() -> { + for (AutoCloseable closable : closables) { + closable.close(); + } + }, () -> iterables.stream().flatMap(i -> StreamSupport.stream(i.spliterator(), false)) + .filter(tabletMetadata -> extentsToFetch.contains(tabletMetadata.getExtent())) + .iterator()); + } } @@ -253,7 +279,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable @Override public Options checkConsistency() { - checkState(extents == null, "Unable to check consistency of non-contiguous tablets"); + checkState(!fetchTablets, "Unable to check consistency of non-contiguous tablets"); this.checkConsistency = true; return this; } @@ -343,10 +369,12 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable } @Override - public Options forTablets(Collection<KeyExtent> extents) { + public Options forTablets(Collection<KeyExtent> extents, + Optional<Consumer<KeyExtent>> notFoundHandler) { this.level = null; - this.extents = extents; this.extentsToFetch = Set.copyOf(extents); + this.notFoundHandler = Objects.requireNonNull(notFoundHandler); + this.fetchTablets = true; return this; } @@ -438,8 +466,15 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable /** * Get the tablet metadata for the given extents. This will only return tablets where the end * row and prev end row exactly match the given extents. + * + * @param notFoundConsumer if a consumer is present, the extents that do not exists in the + * metadata store are passed to the consumer. If the missing extents are not needed, then + * pass Optional.empty() and it will be more efficient. Computing the missing extents + * requires buffering all tablet metadata in memory before returning anything, when + * Optional.empty() is passed this buffering is not done. */ - Options forTablets(Collection<KeyExtent> extents); + Options forTablets(Collection<KeyExtent> extents, + Optional<Consumer<KeyExtent>> notFoundConsumer); /** * This method automatically determines where the metadata for the passed in table ID resides. diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index 9bbf2cfb88..4176807caf 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; @@ -155,8 +156,9 @@ public class CompactionFinalizer { Map<KeyExtent,TabletMetadata> tabletsMetadata; var extents = batch.stream().map(ExternalCompactionFinalState::getExtent).collect(toList()); - try (TabletsMetadata tablets = context.getAmple().readTablets().forTablets(extents) - .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW, ColumnType.ECOMP).build()) { + try (TabletsMetadata tablets = + context.getAmple().readTablets().forTablets(extents, Optional.empty()) + .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW, ColumnType.ECOMP).build()) { tabletsMetadata = tablets.stream().collect(toMap(TabletMetadata::getExtent, identity())); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index fa863394af..0648c18d98 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -149,8 +150,8 @@ public class ScanServer extends AbstractServer loadAll(Set<? extends KeyExtent> keys) { long t1 = System.currentTimeMillis(); @SuppressWarnings("unchecked") - var tms = ample.readTablets().forTablets((Collection<KeyExtent>) keys).build().stream() - .collect(Collectors.toMap(tm -> tm.getExtent(), tm -> tm)); + var tms = ample.readTablets().forTablets((Collection<KeyExtent>) keys, Optional.empty()) + .build().stream().collect(Collectors.toMap(tm -> tm.getExtent(), tm -> tm)); long t2 = System.currentTimeMillis(); LOG.trace("Read metadata for {} tablets in {} ms", keys.size(), t2 - t1); return tms; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 937ca24e24..dcdaa4c266 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -46,6 +46,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; @@ -777,10 +778,11 @@ public class TabletServer extends AbstractServer implements TabletHostingServer Duration duration; Span mdScanSpan = TraceUtil.startSpan(this.getClass(), "metadataScan"); try (Scope scope = mdScanSpan.makeCurrent()) { + List<KeyExtent> missingTablets = new ArrayList<>(); // gather metadata for all tablets readTablets() - try (TabletsMetadata tabletsMetadata = - getContext().getAmple().readTablets().forTablets(onlineTabletsSnapshot.keySet()) - .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) { + try (TabletsMetadata tabletsMetadata = getContext().getAmple().readTablets() + .forTablets(onlineTabletsSnapshot.keySet(), Optional.of(missingTablets::add)) + .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) { duration = Duration.between(start, Instant.now()); log.debug("Metadata scan took {}ms for {} tablets read.", duration.toMillis(), onlineTabletsSnapshot.keySet().size()); @@ -792,6 +794,13 @@ public class TabletServer extends AbstractServer implements TabletHostingServer MetadataUpdateCount counter = updateCounts.get(extent); tablet.compareTabletInfo(counter, tabletMetadata); } + + for (var extent : missingTablets) { + Tablet tablet = onlineTabletsSnapshot.get(extent); + if (!tablet.isClosed()) { + log.error("Tablet {} is open but does not exist in metadata table.", extent); + } + } } } catch (Exception e) { log.error("Unable to complete verification of tablet metadata", e); diff --git a/test/src/main/java/org/apache/accumulo/test/AmpleIT.java b/test/src/main/java/org/apache/accumulo/test/AmpleIT.java new file mode 100644 index 0000000000..fb78b7fad8 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/AmpleIT.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static java.util.stream.Collectors.toSet; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Sets; + +public class AmpleIT extends AccumuloClusterHarness { + + private void runFetchTest(Ample ample, List<KeyExtent> extentsToFetch, Set<KeyExtent> expected, + Set<KeyExtent> expectMissing) { + // always run a test without a consumer for not seen tablets as this takes a different code path + var extentsSeen = ample.readTablets().forTablets(extentsToFetch, Optional.empty()).build() + .stream().map(TabletMetadata::getExtent).collect(toSet()); + assertEquals(expected, extentsSeen); + + HashSet<KeyExtent> extentsNotSeen = new HashSet<>(); + extentsSeen = ample.readTablets().forTablets(extentsToFetch, Optional.of(extentsNotSeen::add)) + .build().stream().map(TabletMetadata::getExtent).collect(toSet()); + assertEquals(expected, extentsSeen); + assertEquals(expectMissing, extentsNotSeen); + } + + @Test + public void testFetchMultipleExtents() throws Exception { + String table = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"), new Text("f"), new Text("v"))); + NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits); + c.tableOperations().create(table, ntc); + + var tableId = TableId.of(c.tableOperations().tableIdMap().get(table)); + + // extents that exist in the metadata table + KeyExtent ke1 = new KeyExtent(tableId, new Text("c"), null); + KeyExtent ke2 = new KeyExtent(tableId, new Text("f"), new Text("c")); + KeyExtent ke3 = new KeyExtent(tableId, new Text("v"), new Text("f")); + KeyExtent ke4 = new KeyExtent(tableId, null, new Text("v")); + + // extents that do not exist in the metadata table + KeyExtent ne1 = new KeyExtent(tableId, null, new Text("g")); + KeyExtent ne2 = new KeyExtent(tableId, new Text("e"), new Text("c")); + KeyExtent ne3 = new KeyExtent(tableId, new Text("b"), null); + KeyExtent ne4 = new KeyExtent(TableId.of(tableId.canonical() + "not"), new Text("c"), null); + + var ample = getServerContext().getAmple(); + + var toFetch = new ArrayList<KeyExtent>(); + + for (var existing : Sets.powerSet(Set.of(ke1, ke2, ke3, ke4))) { + for (var nonexisting : Sets.powerSet(Set.of(ne1, ne2, ne3, ne4))) { + toFetch.clear(); + toFetch.addAll(existing); + toFetch.addAll(nonexisting); + + // run test to ensure when ample fetches multiple extents it handles one that do exist in + // the metadata table and those that do not + runFetchTest(ample, toFetch, existing, nonexisting); + } + } + } + } +}