This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new f26c702a87 Clean up usage of TabletsMetadata (#3980)
f26c702a87 is described below
commit f26c702a87df996b6ccf21c3cef2bb5fd5e751e6
Author: Dom G <[email protected]>
AuthorDate: Wed Jan 10 16:16:36 2024 -0500
Clean up usage of TabletsMetadata (#3980)
* Clean up usage of TabletsMetadata. Close resources and refactoring
improvements.
---
.../core/clientImpl/TableOperationsImpl.java | 62 ++++++++--------
.../clientImpl/bulk/ConcurrentKeyExtentCache.java | 17 ++---
.../org/apache/accumulo/core/summary/Gatherer.java | 22 +++---
.../java/org/apache/accumulo/core/util/Merge.java | 62 ++++++++--------
.../org/apache/accumulo/core/util/MergeTest.java | 31 +++++++-
.../manager/balancer/BalancerEnvironmentImpl.java | 12 ++--
.../accumulo/server/metadata/ServerAmpleImpl.java | 2 +-
.../main/java/org/apache/accumulo/gc/GCRun.java | 4 +-
.../accumulo/gc/GarbageCollectionAlgorithm.java | 84 +++++++++++-----------
.../manager/tableOps/bulkVer2/LoadFiles.java | 23 +++---
.../manager/tableOps/bulkVer2/PrepBulkImport.java | 42 ++++++++---
.../manager/tableOps/compact/CompactionDriver.java | 21 +++---
.../tableOps/bulkVer2/PrepBulkImportTest.java | 31 +++++---
.../org/apache/accumulo/tserver/ScanServer.java | 10 ++-
.../java/org/apache/accumulo/test/AmpleIT.java | 16 +++--
.../java/org/apache/accumulo/test/GCRunIT.java | 13 ++--
.../accumulo/test/ScanServerMetadataEntriesIT.java | 6 +-
.../test/compaction/ExternalCompaction4_IT.java | 29 +++++---
.../test/compaction/ExternalCompaction_1_IT.java | 26 ++++---
.../test/compaction/ExternalCompaction_2_IT.java | 8 +--
.../accumulo/test/functional/CompactionIT.java | 29 +++++---
.../test/functional/GarbageCollectorTrashBase.java | 17 ++---
.../accumulo/test/functional/MetadataIT.java | 15 ++--
23 files changed, 338 insertions(+), 244 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index e9e784b23e..3b7bf91612 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -666,9 +666,9 @@ public class TableOperationsImpl extends
TableOperationsHelper {
TableId tableId = context.getTableId(tableName);
while (true) {
- try {
- return
context.getAmple().readTablets().forTable(tableId).fetch(PREV_ROW).checkConsistency()
- .build().stream().map(tm ->
tm.getExtent().endRow()).filter(Objects::nonNull)
+ try (TabletsMetadata tabletsMetadata =
context.getAmple().readTablets().forTable(tableId)
+ .fetch(PREV_ROW).checkConsistency().build()) {
+ return tabletsMetadata.stream().map(tm ->
tm.getExtent().endRow()).filter(Objects::nonNull)
.collect(Collectors.toList());
} catch (TabletDeletedException tde) {
// see if the table was deleted
@@ -1320,9 +1320,6 @@ public class TableOperationsImpl extends
TableOperationsHelper {
range = new Range(startRow, lastRow);
}
- TabletsMetadata tablets =
TabletsMetadata.builder(context).scanMetadataTable()
- .overRange(range).fetch(LOCATION, PREV_ROW).build();
-
KeyExtent lastExtent = null;
int total = 0;
@@ -1331,34 +1328,38 @@ public class TableOperationsImpl extends
TableOperationsHelper {
Text continueRow = null;
MapCounter<String> serverCounts = new MapCounter<>();
- for (TabletMetadata tablet : tablets) {
- total++;
- Location loc = tablet.getLocation();
+ try (TabletsMetadata tablets =
TabletsMetadata.builder(context).scanMetadataTable()
+ .overRange(range).fetch(LOCATION, PREV_ROW).build()) {
+
+ for (TabletMetadata tablet : tablets) {
+ total++;
+ Location loc = tablet.getLocation();
- if ((expectedState == TableState.ONLINE
- && (loc == null || loc.getType() == LocationType.FUTURE))
- || (expectedState == TableState.OFFLINE && loc != null)) {
- if (continueRow == null) {
- continueRow = tablet.getExtent().toMetaRow();
+ if ((expectedState == TableState.ONLINE
+ && (loc == null || loc.getType() == LocationType.FUTURE))
+ || (expectedState == TableState.OFFLINE && loc != null)) {
+ if (continueRow == null) {
+ continueRow = tablet.getExtent().toMetaRow();
+ }
+ waitFor++;
+ lastRow = tablet.getExtent().toMetaRow();
+
+ if (loc != null) {
+ serverCounts.increment(loc.getHostPortSession(), 1);
+ }
}
- waitFor++;
- lastRow = tablet.getExtent().toMetaRow();
- if (loc != null) {
- serverCounts.increment(loc.getHostPortSession(), 1);
+ if (!tablet.getExtent().tableId().equals(tableId)) {
+ throw new AccumuloException(
+ "Saw unexpected table Id " + tableId + " " +
tablet.getExtent());
}
- }
- if (!tablet.getExtent().tableId().equals(tableId)) {
- throw new AccumuloException(
- "Saw unexpected table Id " + tableId + " " + tablet.getExtent());
- }
+ if (lastExtent != null &&
!tablet.getExtent().isPreviousExtent(lastExtent)) {
+ holes++;
+ }
- if (lastExtent != null &&
!tablet.getExtent().isPreviousExtent(lastExtent)) {
- holes++;
+ lastExtent = tablet.getExtent();
}
-
- lastExtent = tablet.getExtent();
}
if (continueRow != null) {
@@ -2059,8 +2060,11 @@ public class TableOperationsImpl extends
TableOperationsHelper {
@Override
public TimeType getTimeType(final String tableName) throws
TableNotFoundException {
TableId tableId = context.getTableId(tableName);
- Optional<TabletMetadata> tabletMetadata =
context.getAmple().readTablets().forTable(tableId)
-
.fetch(TabletMetadata.ColumnType.TIME).checkConsistency().build().stream().findFirst();
+ Optional<TabletMetadata> tabletMetadata;
+ try (TabletsMetadata tabletsMetadata =
context.getAmple().readTablets().forTable(tableId)
+ .fetch(TabletMetadata.ColumnType.TIME).checkConsistency().build()) {
+ tabletMetadata = tabletsMetadata.stream().findFirst();
+ }
TabletMetadata timeData =
tabletMetadata.orElseThrow(() -> new IllegalStateException("Failed to
retrieve TimeType"));
return timeData.getTime().getType();
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java
index f17639a474..acd5924ff9 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java
@@ -23,7 +23,6 @@ import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
@@ -88,8 +87,9 @@ class ConcurrentKeyExtentCache implements KeyExtentCache {
@VisibleForTesting
protected Stream<KeyExtent> lookupExtents(Text row) {
- return TabletsMetadata.builder(ctx).forTable(tableId).overlapping(row,
true, null)
- .checkConsistency().fetch(PREV_ROW).build().stream().limit(100)
+ TabletsMetadata tabletsMetadata =
TabletsMetadata.builder(ctx).forTable(tableId)
+ .overlapping(row, true,
null).checkConsistency().fetch(PREV_ROW).build();
+ return tabletsMetadata.stream().onClose(tabletsMetadata::close).limit(100)
.map(TabletMetadata::getExtent);
}
@@ -129,15 +129,8 @@ class ConcurrentKeyExtentCache implements KeyExtentCache {
for (Text lookupRow : lookupRows) {
if (getFromCache(lookupRow) == null) {
while (true) {
- try {
- Iterator<KeyExtent> iter = lookupExtents(lookupRow).iterator();
- while (iter.hasNext()) {
- KeyExtent ke2 = iter.next();
- if (inCache(ke2)) {
- break;
- }
- updateCache(ke2);
- }
+ try (Stream<KeyExtent> keyExtentStream =
lookupExtents(lookupRow)) {
+ keyExtentStream.takeWhile(ke2 ->
!inCache(ke2)).forEach(this::updateCache);
break;
} catch (TabletDeletedException tde) {
// tablets were merged away in the table, start over and try
again
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 2bbcdbbabd..e9ee45d763 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -166,17 +166,17 @@ public class Gatherer {
private Map<String,Map<StoredTabletFile,List<TRowRange>>>
getFilesGroupedByLocation(Predicate<StoredTabletFile> fileSelector) {
- Iterable<TabletMetadata> tmi =
TabletsMetadata.builder(ctx).forTable(tableId)
- .overlapping(startRow, endRow).fetch(FILES, LOCATION, LAST,
PREV_ROW).build();
-
// get a subset of files
Map<StoredTabletFile,List<TabletMetadata>> files = new HashMap<>();
- for (TabletMetadata tm : tmi) {
- for (StoredTabletFile file : tm.getFiles()) {
- if (fileSelector.test(file)) {
- // TODO push this filtering to server side and possibly use batch
scanner
- files.computeIfAbsent(file, s -> new ArrayList<>()).add(tm);
+ try (TabletsMetadata tmi = TabletsMetadata.builder(ctx).forTable(tableId)
+ .overlapping(startRow, endRow).fetch(FILES, LOCATION, LAST,
PREV_ROW).build()) {
+ for (TabletMetadata tm : tmi) {
+ for (StoredTabletFile file : tm.getFiles()) {
+ if (fileSelector.test(file)) {
+ // TODO push this filtering to server side and possibly use batch
scanner
+ files.computeIfAbsent(file, s -> new ArrayList<>()).add(tm);
+ }
}
}
}
@@ -447,8 +447,10 @@ public class Gatherer {
private int countFiles() {
// TODO use a batch scanner + iterator to parallelize counting files
- return
TabletsMetadata.builder(ctx).forTable(tableId).overlapping(startRow, endRow)
- .fetch(FILES, PREV_ROW).build().stream().mapToInt(tm ->
tm.getFiles().size()).sum();
+ try (TabletsMetadata tabletsMetadata =
TabletsMetadata.builder(ctx).forTable(tableId)
+ .overlapping(startRow, endRow).fetch(FILES, PREV_ROW).build()) {
+ return tabletsMetadata.stream().mapToInt(tm ->
tm.getFiles().size()).sum();
+ }
}
private class GatherRequest implements Supplier<SummaryCollection> {
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Merge.java
b/core/src/main/java/org/apache/accumulo/core/util/Merge.java
index 41f5a67943..2acfd6457a 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Merge.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Merge.java
@@ -149,19 +149,43 @@ public class Merge {
}
List<Size> sizes = new ArrayList<>();
long totalSize = 0;
- // Merge any until you get larger than the goal size, and then merge one
less tablet
- Iterator<Size> sizeIterator = getSizeIterator(client, table, start, end);
- while (sizeIterator.hasNext()) {
- Size next = sizeIterator.next();
- totalSize += next.size;
- sizes.add(next);
- if (totalSize > goalSize) {
- totalSize = mergeMany(client, table, sizes, goalSize, force, false);
+
+ TableId tableId;
+ ClientContext context = (ClientContext) client;
+ try {
+ tableId = context.getTableId(table);
+ } catch (Exception e) {
+ throw new MergeException(e);
+ }
+
+ try (TabletsMetadata tablets =
TabletsMetadata.builder(context).scanMetadataTable()
+ .overRange(new KeyExtent(tableId, end,
start).toMetaRange()).fetch(FILES, PREV_ROW)
+ .build()) {
+
+ Iterator<Size> sizeIterator = tablets.stream().map(tm -> {
+ long size =
tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum();
+ return new Size(tm.getExtent(), size);
+ }).iterator();
+
+ while (sizeIterator.hasNext()) {
+ Size next = sizeIterator.next();
+ totalSize += next.size;
+ sizes.add(next);
+
+ if (totalSize > goalSize) {
+ mergeMany(client, table, sizes, goalSize, force, false);
+ sizes.clear();
+ sizes.add(next);
+ totalSize = next.size;
+ }
}
}
+
+ // merge one less tablet
if (sizes.size() > 1) {
mergeMany(client, table, sizes, goalSize, force, true);
}
+
} catch (Exception ex) {
throw new MergeException(ex);
}
@@ -239,26 +263,4 @@ public class Merge {
}
}
- protected Iterator<Size> getSizeIterator(AccumuloClient client, String
tablename, Text start,
- Text end) throws MergeException {
- // open up metadata, walk through the tablets.
-
- TableId tableId;
- TabletsMetadata tablets;
- try {
- ClientContext context = (ClientContext) client;
- tableId = context.getTableId(tablename);
- tablets = TabletsMetadata.builder(context).scanMetadataTable()
- .overRange(new KeyExtent(tableId, end,
start).toMetaRange()).fetch(FILES, PREV_ROW)
- .build();
- } catch (Exception e) {
- throw new MergeException(e);
- }
-
- return tablets.stream().map(tm -> {
- long size =
tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum();
- return new Size(tm.getExtent(), size);
- }).iterator();
- }
-
}
diff --git a/core/src/test/java/org/apache/accumulo/core/util/MergeTest.java
b/core/src/test/java/org/apache/accumulo/core/util/MergeTest.java
index 6d253fe0ae..a37a402665 100644
--- a/core/src/test/java/org/apache/accumulo/core/util/MergeTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/util/MergeTest.java
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.util.Merge.Size;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
@@ -57,8 +58,34 @@ public class MergeTest {
protected void message(String format, Object... args) {}
@Override
- protected Iterator<Size> getSizeIterator(AccumuloClient client, String
tablename,
- final Text start, final Text end) throws MergeException {
+ public void mergomatic(AccumuloClient client, String table, Text start,
Text end, long goalSize,
+ boolean force) throws MergeException {
+ if (table.equals(MetadataTable.NAME)) {
+ throw new IllegalArgumentException("cannot merge tablets on the
metadata table");
+ }
+
+ List<Size> sizes = new ArrayList<>();
+ long totalSize = 0;
+
+ Iterator<Size> sizeIterator = getSizeIterator(start, end);
+
+ while (sizeIterator.hasNext()) {
+ Size next = sizeIterator.next();
+ totalSize += next.size;
+ sizes.add(next);
+ if (totalSize > goalSize) {
+ mergeMany(client, table, sizes, goalSize, force, false);
+ sizes.clear();
+ sizes.add(next);
+ totalSize = next.size;
+ }
+ }
+ if (sizes.size() > 1) {
+ mergeMany(client, table, sizes, goalSize, force, true);
+ }
+ }
+
+ protected Iterator<Size> getSizeIterator(final Text start, final Text end)
{
final Iterator<Size> impl = tablets.iterator();
return new Iterator<>() {
Size next = skip();
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java
index a2a1a1b1ba..3018ae1910 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java
@@ -75,11 +75,13 @@ public class BalancerEnvironmentImpl extends
ServiceEnvironmentImpl implements B
@Override
public Map<TabletId,TabletServerId> listTabletLocations(TableId tableId) {
Map<TabletId,TabletServerId> tablets = new LinkedHashMap<>();
- for (var tm :
TabletsMetadata.builder(getContext()).forTable(tableId).fetch(LOCATION,
PREV_ROW)
- .build()) {
- tablets.put(new TabletIdImpl(tm.getExtent()),
- TabletServerIdImpl.fromThrift(Optional.ofNullable(tm.getLocation())
- .map(TabletMetadata.Location::getServerInstance).orElse(null)));
+ try (TabletsMetadata tabletsMetadata =
+
TabletsMetadata.builder(getContext()).forTable(tableId).fetch(LOCATION,
PREV_ROW).build()) {
+ for (var tm : tabletsMetadata) {
+ tablets.put(new TabletIdImpl(tm.getExtent()),
+ TabletServerIdImpl.fromThrift(Optional.ofNullable(tm.getLocation())
+
.map(TabletMetadata.Location::getServerInstance).orElse(null)));
+ }
}
return tablets;
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 7aa6ecb6e1..fd6b55727e 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -364,7 +364,7 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
Scanner scanner = context.createScanner(DataLevel.USER.metaTable(),
Authorizations.EMPTY);
scanner.setRange(ScanServerFileReferenceSection.getRange());
int pLen = ScanServerFileReferenceSection.getRowPrefix().length();
- return StreamSupport.stream(scanner.spliterator(), false)
+ return scanner.stream().onClose(scanner::close)
.map(e -> new
ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen),
e.getKey().getColumnFamily(), e.getKey().getColumnQualifier()));
} catch (TableNotFoundException e) {
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
index db09b7ac7a..798bb6e535 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
@@ -175,9 +175,9 @@ public class GCRun implements GarbageCollectionEnvironment {
if (level == Ample.DataLevel.ROOT) {
tabletStream = Stream.of(context.getAmple().readTablet(RootTable.EXTENT,
DIR, FILES, SCANS));
} else {
- var tabletsMetadata =
TabletsMetadata.builder(context).scanTable(level.metaTable())
+ TabletsMetadata tm =
TabletsMetadata.builder(context).scanTable(level.metaTable())
.checkConsistency().fetch(DIR, FILES, SCANS).build();
- tabletStream = tabletsMetadata.stream();
+ tabletStream = tm.stream().onClose(tm::close);
}
// there is a lot going on in this "one line" so see below for more info
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
index 48360b3ef3..ab9bff706e 100644
---
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
+++
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
@@ -143,55 +143,57 @@ public class GarbageCollectionAlgorithm {
List<GcCandidate> candidateEntriesToBeDeleted = new ArrayList<>();
Set<TableId> tableIdsBefore = gce.getCandidateTableIDs();
Set<TableId> tableIdsSeen = new HashSet<>();
- Iterator<Reference> iter = gce.getReferences().iterator();
- while (iter.hasNext()) {
- Reference ref = iter.next();
- tableIdsSeen.add(ref.getTableId());
+ try (Stream<Reference> references = gce.getReferences()) {
+ references.forEach(ref -> {
+ tableIdsSeen.add(ref.getTableId());
- if (ref.isDirectory()) {
- var dirReference = (ReferenceDirectory) ref;
- ServerColumnFamily.validateDirCol(dirReference.getTabletDir());
+ if (ref.isDirectory()) {
+ var dirReference = (ReferenceDirectory) ref;
+ ServerColumnFamily.validateDirCol(dirReference.getTabletDir());
- String dir = "/" + dirReference.tableId + "/" +
dirReference.getTabletDir();
+ String dir = "/" + dirReference.tableId + "/" +
dirReference.getTabletDir();
- dir = makeRelative(dir, 2);
+ dir = makeRelative(dir, 2);
- GcCandidate gcTemp = candidateMap.remove(dir);
- if (gcTemp != null) {
- log.debug("Directory Candidate was still in use by dir ref: {}",
dir);
- // Do not add dir candidates to candidateEntriesToBeDeleted as they
are only created once.
- }
- } else {
- String reference = ref.getMetadataPath();
- if (reference.startsWith("/")) {
- log.debug("Candidate {} has a relative path, prepend tableId {}",
reference,
- ref.getTableId());
- reference = "/" + ref.getTableId() + ref.getMetadataPath();
- } else if (!reference.contains(":") && !reference.startsWith("../")) {
- throw new RuntimeException("Bad file reference " + reference);
- }
+ GcCandidate gcTemp = candidateMap.remove(dir);
+ if (gcTemp != null) {
+ log.debug("Directory Candidate was still in use by dir ref: {}",
dir);
+ // Do not add dir candidates to candidateEntriesToBeDeleted as
they are only created
+ // once.
+ }
+ } else {
+ String reference = ref.getMetadataPath();
+ if (reference.startsWith("/")) {
+ log.debug("Candidate {} has a relative path, prepend tableId {}",
reference,
+ ref.getTableId());
+ reference = "/" + ref.getTableId() + ref.getMetadataPath();
+ } else if (!reference.contains(":") && !reference.startsWith("../"))
{
+ throw new RuntimeException("Bad file reference " + reference);
+ }
- String relativePath = makeRelative(reference, 3);
-
- // WARNING: This line is EXTREMELY IMPORTANT.
- // You MUST REMOVE candidates that are still in use
- GcCandidate gcTemp = candidateMap.remove(relativePath);
- if (gcTemp != null) {
- log.debug("File Candidate was still in use: {}", relativePath);
- // Prevent deletion of candidates that are still in use by scans,
because they won't be
- // recreated once the scan is finished.
- if (!ref.isScan()) {
- candidateEntriesToBeDeleted.add(gcTemp);
+ String relativePath = makeRelative(reference, 3);
+
+ // WARNING: This line is EXTREMELY IMPORTANT.
+ // You MUST REMOVE candidates that are still in use
+ GcCandidate gcTemp = candidateMap.remove(relativePath);
+ if (gcTemp != null) {
+ log.debug("File Candidate was still in use: {}", relativePath);
+ // Prevent deletion of candidates that are still in use by scans,
because they won't be
+ // recreated once the scan is finished.
+ if (!ref.isScan()) {
+ candidateEntriesToBeDeleted.add(gcTemp);
+ }
}
- }
- String dir = relativePath.substring(0, relativePath.lastIndexOf('/'));
- GcCandidate gcT = candidateMap.remove(dir);
- if (gcT != null) {
- log.debug("Directory Candidate was still in use by file ref: {}",
relativePath);
- // Do not add dir candidates to candidateEntriesToBeDeleted as they
are only created once.
+ String dir = relativePath.substring(0,
relativePath.lastIndexOf('/'));
+ GcCandidate gcT = candidateMap.remove(dir);
+ if (gcT != null) {
+ log.debug("Directory Candidate was still in use by file ref: {}",
relativePath);
+ // Do not add dir candidates to candidateEntriesToBeDeleted as
they are only created
+ // once.
+ }
}
- }
+ });
}
Set<TableId> tableIdsAfter = gce.getCandidateTableIDs();
ensureAllTablesChecked(Collections.unmodifiableSet(tableIdsBefore),
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index a5234819dc..02924116ec 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -328,24 +328,25 @@ class LoadFiles extends ManagerRepo {
Text startRow = loadMapEntry.getKey().prevEndRow();
- Iterator<TabletMetadata> tabletIter =
-
TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow,
null)
- .checkConsistency().fetch(PREV_ROW, LOCATION,
LOADED).build().iterator();
-
Loader loader;
if (bulkInfo.tableState == TableState.ONLINE) {
loader = new OnlineLoader();
} else {
loader = new OfflineLoader();
}
-
+ long t1;
loader.start(bulkDir, manager, tid, bulkInfo.setTime);
-
- long t1 = System.currentTimeMillis();
- while (lmi.hasNext()) {
- loadMapEntry = lmi.next();
- List<TabletMetadata> tablets =
findOverlappingTablets(loadMapEntry.getKey(), tabletIter);
- loader.load(tablets, loadMapEntry.getValue());
+ try (TabletsMetadata tabletsMetadata =
+
TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow,
null)
+ .checkConsistency().fetch(PREV_ROW, LOCATION, LOADED).build()) {
+
+ t1 = System.currentTimeMillis();
+ while (lmi.hasNext()) {
+ loadMapEntry = lmi.next();
+ List<TabletMetadata> tablets =
+ findOverlappingTablets(loadMapEntry.getKey(),
tabletsMetadata.iterator());
+ loader.load(tablets, loadMapEntry.getValue());
+ }
}
long sleepTime = loader.finish();
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
index 2b8788f887..e66796c54d 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
@@ -22,6 +22,7 @@ import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -102,7 +103,7 @@ public class PrepBulkImport extends ManagerRepo {
}
@VisibleForTesting
- interface TabletIterFactory {
+ interface TabletIterFactory extends AutoCloseable {
Iterator<KeyExtent> newTabletIter(Text startRow);
}
@@ -194,6 +195,33 @@ public class PrepBulkImport extends ManagerRepo {
return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(),
firstTablet.prevEndRow());
}
+ private static class TabletIterFactoryImpl implements TabletIterFactory {
+ private final List<AutoCloseable> resourcesToClose = new ArrayList<>();
+ private final Manager manager;
+ private final BulkInfo bulkInfo;
+
+ public TabletIterFactoryImpl(Manager manager, BulkInfo bulkInfo) {
+ this.manager = manager;
+ this.bulkInfo = bulkInfo;
+ }
+
+ @Override
+ public Iterator<KeyExtent> newTabletIter(Text startRow) {
+ TabletsMetadata tabletsMetadata =
+
TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId)
+ .overlapping(startRow,
null).checkConsistency().fetch(PREV_ROW).build();
+ resourcesToClose.add(tabletsMetadata);
+ return
tabletsMetadata.stream().map(TabletMetadata::getExtent).iterator();
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (AutoCloseable resource : resourcesToClose) {
+ resource.close();
+ }
+ }
+ }
+
private KeyExtent checkForMerge(final long tid, final Manager manager)
throws Exception {
VolumeManager fs = manager.getVolumeManager();
@@ -202,14 +230,10 @@ public class PrepBulkImport extends ManagerRepo {
int maxTablets =
manager.getContext().getTableConfiguration(bulkInfo.tableId)
.getCount(Property.TABLE_BULK_MAX_TABLETS);
- try (LoadMappingIterator lmi =
- BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId,
fs::open)) {
-
- TabletIterFactory tabletIterFactory =
- startRow ->
TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId)
- .overlapping(startRow,
null).checkConsistency().fetch(PREV_ROW).build().stream()
- .map(TabletMetadata::getExtent).iterator();
-
+ try (
+ LoadMappingIterator lmi =
+ BulkSerialize.readLoadMapping(bulkDir.toString(),
bulkInfo.tableId, fs::open);
+ TabletIterFactory tabletIterFactory = new
TabletIterFactoryImpl(manager, bulkInfo)) {
return validateLoadMapping(bulkInfo.tableId.canonical(), lmi,
tabletIterFactory, maxTablets,
tid);
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index b17ce8b63c..d005faf04e 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -104,18 +104,19 @@ class CompactionDriver extends ManagerRepo {
int tabletsToWaitFor = 0;
int tabletCount = 0;
- TabletsMetadata tablets =
TabletsMetadata.builder(manager.getContext()).forTable(tableId)
- .overlapping(startRow, endRow).fetch(LOCATION, PREV_ROW,
COMPACT_ID).build();
-
- for (TabletMetadata tablet : tablets) {
- if (tablet.getCompactId().orElse(-1) < compactId) {
- tabletsToWaitFor++;
- if (tablet.hasCurrent()) {
- serversToFlush.increment(tablet.getLocation().getServerInstance(),
1);
+ try (TabletsMetadata tablets =
TabletsMetadata.builder(manager.getContext()).forTable(tableId)
+ .overlapping(startRow, endRow).fetch(LOCATION, PREV_ROW,
COMPACT_ID).build()) {
+
+ for (TabletMetadata tablet : tablets) {
+ if (tablet.getCompactId().orElse(-1) < compactId) {
+ tabletsToWaitFor++;
+ if (tablet.hasCurrent()) {
+ serversToFlush.increment(tablet.getLocation().getServerInstance(),
1);
+ }
}
- }
- tabletCount++;
+ tabletCount++;
+ }
}
long scanTime = System.currentTimeMillis() - t1;
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
index 62bbbd1402..dff909896b 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
@@ -31,6 +31,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -96,21 +97,29 @@ public class PrepBulkImportTest {
public void runTest(Map<KeyExtent,String> loadRanges, List<KeyExtent>
tabletRanges,
int maxTablets) throws Exception {
- TabletIterFactory tabletIterFactory = startRow -> {
- int start = -1;
-
- if (startRow == null) {
- start = 0;
- } else {
- for (int i = 0; i < tabletRanges.size(); i++) {
- if (tabletRanges.get(i).contains(startRow)) {
- start = i;
- break;
+ TabletIterFactory tabletIterFactory = new TabletIterFactory() {
+ @Override
+ public Iterator<KeyExtent> newTabletIter(Text startRow) {
+ int start = -1;
+
+ if (startRow == null) {
+ start = 0;
+ } else {
+ for (int i = 0; i < tabletRanges.size(); i++) {
+ if (tabletRanges.get(i).contains(startRow)) {
+ start = i;
+ break;
+ }
}
}
+
+ return tabletRanges.subList(start, tabletRanges.size()).iterator();
}
- return tabletRanges.subList(start, tabletRanges.size()).iterator();
+ @Override
+ public void close() {
+ // nothing to close
+ }
};
var sortedExtents =
loadRanges.keySet().stream().sorted().collect(Collectors.toList());
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index fe227959f7..00ffa10391 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -77,6 +77,7 @@ import
org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.tabletscan.thrift.ActiveScan;
@@ -152,10 +153,15 @@ public class ScanServer extends AbstractServer
@Override
public Map<? extends KeyExtent,? extends TabletMetadata>
loadAll(Set<? extends KeyExtent> keys) {
+ Map<KeyExtent,TabletMetadata> tms;
long t1 = System.currentTimeMillis();
@SuppressWarnings("unchecked")
- var tms = ample.readTablets().forTablets((Collection<KeyExtent>) keys,
Optional.empty())
- .build().stream().collect(Collectors.toMap(tm -> tm.getExtent(), tm
-> tm));
+ Collection<KeyExtent> extents = (Collection<KeyExtent>) keys;
+ try (TabletsMetadata tabletsMetadata =
+ ample.readTablets().forTablets(extents, Optional.empty()).build()) {
+ tms = tabletsMetadata.stream().onClose(tabletsMetadata::close)
+ .collect(Collectors.toMap(TabletMetadata::getExtent, tm -> tm));
+ }
long t2 = System.currentTimeMillis();
LOG.trace("Read metadata for {} tablets in {} ms", keys.size(), t2 - t1);
return tms;
diff --git a/test/src/main/java/org/apache/accumulo/test/AmpleIT.java
b/test/src/main/java/org/apache/accumulo/test/AmpleIT.java
index fb78b7fad8..e89a1fd4b4 100644
--- a/test/src/main/java/org/apache/accumulo/test/AmpleIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/AmpleIT.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
@@ -46,14 +47,19 @@ public class AmpleIT extends AccumuloClusterHarness {
private void runFetchTest(Ample ample, List<KeyExtent> extentsToFetch,
Set<KeyExtent> expected,
Set<KeyExtent> expectMissing) {
+ Set<KeyExtent> extentsSeen;
// always run a test without a consumer for not seen tablets as this takes
a different code path
- var extentsSeen = ample.readTablets().forTablets(extentsToFetch,
Optional.empty()).build()
- .stream().map(TabletMetadata::getExtent).collect(toSet());
- assertEquals(expected, extentsSeen);
+ try (TabletsMetadata tm =
+ ample.readTablets().forTablets(extentsToFetch,
Optional.empty()).build()) {
+ extentsSeen =
tm.stream().map(TabletMetadata::getExtent).collect(toSet());
+ assertEquals(expected, extentsSeen);
+ }
HashSet<KeyExtent> extentsNotSeen = new HashSet<>();
- extentsSeen = ample.readTablets().forTablets(extentsToFetch,
Optional.of(extentsNotSeen::add))
- .build().stream().map(TabletMetadata::getExtent).collect(toSet());
+ try (TabletsMetadata tm =
+ ample.readTablets().forTablets(extentsToFetch,
Optional.of(extentsNotSeen::add)).build()) {
+ extentsSeen =
tm.stream().map(TabletMetadata::getExtent).collect(toSet());
+ }
assertEquals(expected, extentsSeen);
assertEquals(expectMissing, extentsNotSeen);
}
diff --git a/test/src/main/java/org/apache/accumulo/test/GCRunIT.java
b/test/src/main/java/org/apache/accumulo/test/GCRunIT.java
index c57f4be4dc..a48b0a595e 100644
--- a/test/src/main/java/org/apache/accumulo/test/GCRunIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/GCRunIT.java
@@ -27,7 +27,7 @@ import java.time.Duration;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -155,16 +155,11 @@ public class GCRunIT extends SharedMiniClusterBase {
public void forceMissingPrevRowTest() {}
private void scanReferences(GCRun userGC) {
- final AtomicInteger counter = new AtomicInteger(0);
// loop through the user table references - the row deleted above should
violate dir present.
- var userTableIter = userGC.getReferences().iterator();
- while (userTableIter.hasNext()) {
- Reference ref = userTableIter.next();
- counter.incrementAndGet();
- log.trace("user ref: {}", ref);
+ try (Stream<Reference> references = userGC.getReferences()) {
+ long count = references.peek(ref -> log.trace("user ref: {}",
ref)).count();
+ assertTrue(count > 0);
}
-
- assertTrue(counter.get() > 0);
}
private void fillMetadataEntries(final String table1, final String clone1)
throws Exception {
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
index de7413a898..68b3bff0c7 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
@@ -251,8 +251,10 @@ public class ScanServerMetadataEntriesIT extends
SharedMiniClusterBase {
assertEquals(fileCount, metadataScanFileRefs.size());
assertEquals(fileCount,
ctx.getAmple().getScanServerFileReferences().count());
-
- List<Reference> refs = gc.getReferences().collect(Collectors.toList());
+ List<Reference> refs;
+ try (Stream<Reference> references = gc.getReferences()) {
+ refs = references.collect(Collectors.toList());
+ }
assertTrue(refs.size() > fileCount * 2);
List<Reference> tableRefs =
refs.stream().filter(r -> r.getTableId().equals(tid) &&
!r.isDirectory())
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java
index 12b0eb5286..b52e7a41c7 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java
@@ -71,9 +71,11 @@ public class ExternalCompaction4_IT extends
AccumuloClusterHarness {
ReadWriteIT.verify(client, 50, 1, 1, 0, table1);
Ample ample = ((ClientContext) client).getAmple();
- TabletsMetadata tms =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
- TabletMetadata tm = tms.iterator().next();
- assertEquals(50, tm.getFiles().size());
+ try (
+ TabletsMetadata tms =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) {
+ TabletMetadata tm = tms.iterator().next();
+ assertEquals(50, tm.getFiles().size());
+ }
IteratorSetting setting = new IteratorSetting(50, "ageoff",
AgeOffFilter.class);
setting.addOption("ttl", "0");
@@ -87,8 +89,9 @@ public class ExternalCompaction4_IT extends
AccumuloClusterHarness {
client.tableOperations().attachIterator(table1, setting2,
EnumSet.of(IteratorScope.majc));
client.tableOperations().compact(table1, new
CompactionConfig().setWait(true));
- assertThrows(NoSuchElementException.class, () ->
ample.readTablets().forTable(tid)
- .fetch(ColumnType.FILES).build().iterator().next());
+ try (TabletsMetadata tm =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) {
+ assertThrows(NoSuchElementException.class, () -> tm.iterator().next());
+ }
assertEquals(0, client.createScanner(table1).stream().count());
} finally {
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
@@ -110,18 +113,22 @@ public class ExternalCompaction4_IT extends
AccumuloClusterHarness {
ReadWriteIT.ingest(client, 1000, 1, 1, 0, "colf", table1, 1);
Ample ample = ((ClientContext) client).getAmple();
- TabletsMetadata tms =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
- TabletMetadata tm = tms.iterator().next();
- assertEquals(1000, tm.getFiles().size());
+ try (
+ TabletsMetadata tms =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) {
+ TabletMetadata tm = tms.iterator().next();
+ assertEquals(1000, tm.getFiles().size());
+ }
IteratorSetting setting = new IteratorSetting(50, "error",
ErrorThrowingIterator.class);
setting.addOption(ErrorThrowingIterator.TIMES, "3");
client.tableOperations().attachIterator(table1, setting,
EnumSet.of(IteratorScope.majc));
client.tableOperations().compact(table1, new
CompactionConfig().setWait(true));
- tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
- tm = tms.iterator().next();
- assertEquals(1, tm.getFiles().size());
+ try (
+ TabletsMetadata tms =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) {
+ TabletMetadata tm = tms.iterator().next();
+ assertEquals(1, tm.getFiles().size());
+ }
ReadWriteIT.verify(client, 1000, 1, 1, 0, table1);
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 35630437ab..1713c4f496 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.test.compaction;
+import static com.google.common.collect.MoreCollectors.onlyElement;
import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
@@ -409,20 +410,17 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
}
LOG.info("Validating metadata table contents.");
- TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets().forTable(tid)
- .fetch(ColumnType.ECOMP).build();
- List<TabletMetadata> md = new ArrayList<>();
- tm.forEach(t -> md.add(t));
- assertEquals(1, md.size());
- TabletMetadata m = md.get(0);
- Map<ExternalCompactionId,ExternalCompactionMetadata> em =
m.getExternalCompactions();
- assertEquals(1, em.size());
- List<ExternalCompactionFinalState> finished = new ArrayList<>();
- getFinalStatesForTable(getCluster(), tid).forEach(f -> finished.add(f));
- assertEquals(1, finished.size());
- assertEquals(em.entrySet().iterator().next().getKey(),
- finished.get(0).getExternalCompactionId());
- tm.close();
+ try (TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets()
+ .forTable(tid).fetch(ColumnType.ECOMP).build()) {
+ TabletMetadata m = tm.stream().collect(onlyElement());
+ Map<ExternalCompactionId,ExternalCompactionMetadata> em =
m.getExternalCompactions();
+ assertEquals(1, em.size());
+ List<ExternalCompactionFinalState> finished = new ArrayList<>();
+ getFinalStatesForTable(getCluster(), tid).forEach(f ->
finished.add(f));
+ assertEquals(1, finished.size());
+ assertEquals(em.entrySet().iterator().next().getKey(),
+ finished.get(0).getExternalCompactionId());
+ }
// Force a flush on the metadata table before killing our tserver
client.tableOperations().flush(MetadataTable.NAME);
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
index 9b0a94efae..3b902a5eab 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
@@ -336,10 +336,10 @@ public class ExternalCompaction_2_IT extends
SharedMiniClusterBase {
confirmCompactionCompleted(getCluster().getServerContext(), ecids,
TCompactionState.CANCELLED);
- TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets().forTable(tid)
- .fetch(ColumnType.ECOMP).build();
- assertEquals(0, tm.stream().count());
- tm.close();
+ try (TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets()
+ .forTable(tid).fetch(ColumnType.ECOMP).build()) {
+ assertEquals(0, tm.stream().count());
+ }
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index f29227b79e..a5e3ea4c1b 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -377,18 +377,22 @@ public class CompactionIT extends AccumuloClusterHarness {
ReadWriteIT.ingest(client, MAX_DATA, 1, 1, 0, "colf", table1, 1);
Ample ample = ((ClientContext) client).getAmple();
- TabletsMetadata tms =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
- TabletMetadata tm = tms.iterator().next();
- assertEquals(1000, tm.getFiles().size());
+ try (TabletsMetadata tms =
+ ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();) {
+ TabletMetadata tm = tms.iterator().next();
+ assertEquals(1000, tm.getFiles().size());
+ }
IteratorSetting setting = new IteratorSetting(50, "error",
ErrorThrowingIterator.class);
setting.addOption(ErrorThrowingIterator.TIMES, "3");
client.tableOperations().attachIterator(table1, setting,
EnumSet.of(IteratorScope.majc));
client.tableOperations().compact(table1, new
CompactionConfig().setWait(true));
- tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
- tm = tms.iterator().next();
- assertEquals(1, tm.getFiles().size());
+ try (
+ TabletsMetadata tms =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) {
+ TabletMetadata tm = tms.iterator().next();
+ assertEquals(1, tm.getFiles().size());
+ }
ReadWriteIT.verify(client, MAX_DATA, 1, 1, 0, table1);
@@ -408,9 +412,11 @@ public class CompactionIT extends AccumuloClusterHarness {
ReadWriteIT.verify(client, 50, 1, 1, 0, table1);
Ample ample = ((ClientContext) client).getAmple();
- TabletsMetadata tms =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
- TabletMetadata tm = tms.iterator().next();
- assertEquals(50, tm.getFiles().size());
+ try (
+ TabletsMetadata tms =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) {
+ TabletMetadata tm = tms.iterator().next();
+ assertEquals(50, tm.getFiles().size());
+ }
IteratorSetting setting = new IteratorSetting(50, "ageoff",
AgeOffFilter.class);
setting.addOption("ttl", "0");
@@ -424,8 +430,9 @@ public class CompactionIT extends AccumuloClusterHarness {
client.tableOperations().attachIterator(table1, setting2,
EnumSet.of(IteratorScope.majc));
client.tableOperations().compact(table1, new
CompactionConfig().setWait(true));
- assertThrows(NoSuchElementException.class, () ->
ample.readTablets().forTable(tid)
- .fetch(ColumnType.FILES).build().iterator().next());
+ try (TabletsMetadata tm =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) {
+ assertThrows(NoSuchElementException.class, () -> tm.iterator().next());
+ }
assertEquals(0, client.createScanner(table1).stream().count());
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashBase.java
b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashBase.java
index f1411a04fc..8904dadd67 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashBase.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashBase.java
@@ -26,6 +26,7 @@ import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.data.TableId;
@@ -51,14 +52,14 @@ public class GarbageCollectorTrashBase extends
ConfigurableMacBase {
protected ArrayList<StoredTabletFile> getFilesForTable(ServerContext ctx,
AccumuloClient client,
String tableName) {
String tid = client.tableOperations().tableIdMap().get(tableName);
- TabletsMetadata tms =
-
ctx.getAmple().readTablets().forTable(TableId.of(tid)).fetch(ColumnType.FILES).build();
- ArrayList<StoredTabletFile> files = new ArrayList<>();
- tms.forEach(tm -> {
- files.addAll(tm.getFiles());
- });
- LOG.debug("Tablet files: {}", files);
- return files;
+ try (TabletsMetadata tms =
+
ctx.getAmple().readTablets().forTable(TableId.of(tid)).fetch(ColumnType.FILES).build())
{
+ ArrayList<StoredTabletFile> files =
+ tms.stream().flatMap(tabletMetadata ->
tabletMetadata.getFiles().stream())
+ .collect(Collectors.toCollection(ArrayList::new));
+ LOG.debug("Tablet files: {}", files);
+ return files;
+ }
}
protected ArrayList<StoredTabletFile> loadData(ServerContext ctx,
AccumuloClient client,
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java
index 0571dbce7c..14ff0f90b1 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java
@@ -34,6 +34,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -174,12 +175,16 @@ public class MetadataIT extends AccumuloClusterHarness {
Text startRow = new Text("a");
Text endRow = new Text("z");
- // Call up Ample from the client context using table "t" and build
- TabletsMetadata tablets =
cc.getAmple().readTablets().forTable(TableId.of("1"))
- .overlapping(startRow, endRow).fetch(FILES, LOCATION, LAST,
PREV_ROW).build();
+ TabletMetadata tabletMetadata0;
+ TabletMetadata tabletMetadata1;
- TabletMetadata tabletMetadata0 =
tablets.stream().findFirst().orElseThrow();
- TabletMetadata tabletMetadata1 =
tablets.stream().skip(1).findFirst().orElseThrow();
+ // Call up Ample from the client context using table "t" and build
+ try (TabletsMetadata tm =
cc.getAmple().readTablets().forTable(TableId.of("1"))
+ .overlapping(startRow, endRow).fetch(FILES, LOCATION, LAST,
PREV_ROW).build()) {
+ var tablets = tm.stream().limit(2).collect(Collectors.toList());
+ tabletMetadata0 = tablets.get(0);
+ tabletMetadata1 = tablets.get(1);
+ }
String infoTabletId0 = tabletMetadata0.getTableId().toString();
String infoExtent0 = tabletMetadata0.getExtent().toString();