This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 8926fe7a75 Updates merge code to support merging many tablets (#3934) 8926fe7a75 is described below commit 8926fe7a756308b7dfd14fc8f73ec70a75f94d3a Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Nov 9 11:10:04 2023 -0500 Updates merge code to support merging many tablets (#3934) Using the conditional writer changes from #3929 this commit updates the merge code to support merging more tablets than will fit in memory. Also adds merge and clone operations to the SplitMillionIT and merges a table with one million tablets in the test. Without the changes in this commit that test would cause the manager to die with an out of memory error. Added a check to ensure the number of files in the merge range does not exceed a configurable maximum. With these changes its possible to merge lots (like millions) of empty tablets. However if merging lots of tablets with files, then the new check will trigger to avoid creating a tablet with too many files. Added test for merging tablets with too many files. --- .../org/apache/accumulo/core/conf/Property.java | 7 ++ .../manager/tableOps/merge/CountFiles.java | 86 +++++++++++++++ .../manager/tableOps/merge/DeleteTablets.java | 119 +++++++++++++++++++++ .../manager/tableOps/merge/FinishTableRangeOp.java | 58 ++++++---- .../manager/tableOps/merge/MergeTablets.java | 43 ++------ .../manager/tableOps/merge/ReserveTablets.java | 83 +++++++------- .../manager/tableOps/merge/UnreserveAndError.java | 53 +++++++++ .../accumulo/test/functional/DeleteRowsIT.java | 90 ++++++++++++++++ .../apache/accumulo/test/functional/MergeIT.java | 66 ++++++++++++ .../accumulo/test/functional/SplitMillionIT.java | 38 +++++-- 10 files changed, 540 insertions(+), 103 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index d2dea79464..f89825d709 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -937,6 +937,13 @@ public enum Property { + " from having more RFiles than can be opened. Setting this property low may" + " throttle ingest and increase query performance.", "1.4.0"), + TABLE_MERGE_FILE_MAX("table.merge.file.max", "10000", PropertyType.COUNT, + "The maximum number of files that a merge operation will process. Before " + + "merging a sum of the number of files in the merge range is computed and if it " + + "exceeds this configuration then the merge will error and fail. For example if " + + "there are 100 tablets each having 10 files in the merge range, then the sum would " + + "be 1000 and the merge will only proceed if this property is greater than 1000.", + "4.0.0"), TABLE_FILE_SUMMARY_MAX_SIZE("table.file.summary.maxSize", "256k", PropertyType.BYTES, "The maximum size summary that will be stored. The number of RFiles that" + " had summary data exceeding this threshold is reported by" diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java new file mode 100644 index 0000000000..a6f09dcbe0 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.merge; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CountFiles extends ManagerRepo { + private static final Logger log = LoggerFactory.getLogger(CountFiles.class); + private static final long serialVersionUID = 1L; + private final MergeInfo data; + + public CountFiles(MergeInfo mergeInfo) { + this.data = mergeInfo; + } + + @Override + public Repo<Manager> call(long tid, Manager env) throws Exception { + + var range = data.getReserveExtent(); + + long totalFiles = 0; + + try (var tablets = env.getContext().getAmple().readTablets().forTable(data.tableId) + .overlapping(range.prevEndRow(), range.endRow()).fetch(FILES).checkConsistency().build()) { + + switch (data.op) { + case MERGE: + for (var tabletMeta : tablets) { + totalFiles += tabletMeta.getFiles().size(); + } + break; + case DELETE: + for (var tabletMeta : tablets) { + // Files in tablets that are completely contained within the merge range will be + // deleted, so do not count these files . + if (!data.getOriginalExtent().contains(tabletMeta.getExtent())) { + totalFiles += tabletMeta.getFiles().size(); + } + } + break; + default: + throw new IllegalStateException("Unknown op " + data.op); + } + } + + long maxFiles = env.getContext().getTableConfiguration(data.getOriginalExtent().tableId()) + .getCount(Property.TABLE_MERGE_FILE_MAX); + + log.debug("{} found {} files in the merge range, maxFiles is {}", FateTxId.formatTid(tid), + totalFiles, maxFiles); + + if (totalFiles >= maxFiles) { + return new UnreserveAndError(data, totalFiles, maxFiles); + } else { + if (data.op == MergeInfo.Operation.MERGE) { + return new MergeTablets(data); + } else { + return new DeleteRows(data); + } + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java new file mode 100644 index 0000000000..f34708681a --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.merge; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Delete tablets that were merged into another tablet. + */ +public class DeleteTablets extends ManagerRepo { + + private static final long serialVersionUID = 1L; + + private final MergeInfo data; + + private final byte[] lastTabletEndRow; + + private static final Logger log = LoggerFactory.getLogger(DeleteTablets.class); + + DeleteTablets(MergeInfo mergeInfo, Text lastTabletEndRow) { + this.data = mergeInfo; + this.lastTabletEndRow = lastTabletEndRow == null ? null : TextUtil.getBytes(lastTabletEndRow); + } + + @Override + public Repo<Manager> call(long tid, Manager manager) throws Exception { + + var fateStr = FateTxId.formatTid(tid); + KeyExtent range = data.getMergeExtent(); + log.debug("{} Deleting tablets for {}", fateStr, range); + var opid = TabletOperationId.from(TabletOperationType.MERGING, tid); + + AtomicLong acceptedCount = new AtomicLong(); + AtomicLong rejectedCount = new AtomicLong(); + // delete tablets + BiConsumer<KeyExtent,Ample.ConditionalResult> resultConsumer = (extent, result) -> { + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + acceptedCount.incrementAndGet(); + } else { + log.error("{} failed to update {}", fateStr, extent); + rejectedCount.incrementAndGet(); + } + }; + + long submitted = 0; + + try ( + var tabletsMetadata = + manager.getContext().getAmple().readTablets().forTable(range.tableId()) + .overlapping(range.prevEndRow(), range.endRow()).saveKeyValues().build(); + var tabletsMutator = + manager.getContext().getAmple().conditionallyMutateTablets(resultConsumer)) { + + var lastEndRow = lastTabletEndRow == null ? null : new Text(lastTabletEndRow); + + for (var tabletMeta : tabletsMetadata) { + MergeTablets.validateTablet(tabletMeta, fateStr, opid, data.tableId); + + // do not delete the last tablet + if (Objects.equals(tabletMeta.getExtent().endRow(), lastEndRow)) { + break; + } + + var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent()) + .requireOperation(opid).requireAbsentLocation(); + + tabletMeta.getKeyValues().keySet().forEach(key -> { + log.trace("{} deleting {}", fateStr, key); + }); + + tabletMutator.deleteAll(tabletMeta.getKeyValues().keySet()); + // if the tablet no longer exists, then it was successful + tabletMutator.submit(Ample.RejectionHandler.acceptAbsentTablet()); + submitted++; + } + } + + Preconditions.checkState(acceptedCount.get() == submitted && rejectedCount.get() == 0, + "Failed to delete tablets accepted:%s != %s rejected:%s", acceptedCount.get(), submitted, + rejectedCount.get()); + + log.debug("{} deleted {} tablets", fateStr, submitted); + + return new FinishTableRangeOp(data); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java index 323ca6bc0f..9ae16c1ac4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java @@ -22,6 +22,9 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; @@ -49,45 +52,60 @@ class FinishTableRangeOp extends ManagerRepo { @Override public Repo<Manager> call(long tid, Manager manager) throws Exception { + removeOperationIds(log, data, tid, manager); + + Utils.unreserveTable(manager, data.tableId, tid, true); + Utils.unreserveNamespace(manager, data.namespaceId, tid, false); + return null; + } + + static void removeOperationIds(Logger log, MergeInfo data, long tid, Manager manager) { KeyExtent range = data.getReserveExtent(); var opid = TabletOperationId.from(TabletOperationType.MERGING, tid); log.debug("{} unreserving tablet in range {}", FateTxId.formatTid(tid), range); + var fateStr = FateTxId.formatTid(tid); + + AtomicLong acceptedCount = new AtomicLong(); + AtomicLong rejectedCount = new AtomicLong(); + // delete tablets + BiConsumer<KeyExtent,Ample.ConditionalResult> resultConsumer = (extent, result) -> { + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + acceptedCount.incrementAndGet(); + } else { + log.error("{} failed to update {}", fateStr, extent); + rejectedCount.incrementAndGet(); + } + }; + + int submitted = 0; + int count = 0; try (var tablets = manager.getContext().getAmple().readTablets().forTable(data.tableId) .overlapping(range.prevEndRow(), range.endRow()).fetch(PREV_ROW, LOCATION, OPID).build(); - var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets();) { - int opsDeleted = 0; - int count = 0; + var tabletsMutator = + manager.getContext().getAmple().conditionallyMutateTablets(resultConsumer)) { for (var tabletMeta : tablets) { if (opid.equals(tabletMeta.getOperationId())) { tabletsMutator.mutateTablet(tabletMeta.getExtent()).requireOperation(opid) .deleteOperation().submit(tm -> !opid.equals(tm.getOperationId())); - opsDeleted++; + submitted++; } count++; } Preconditions.checkState(count > 0); + } - var results = tabletsMutator.process(); - var deletesAccepted = - results.values().stream().filter(conditionalResult -> conditionalResult.getStatus() - == Ample.ConditionalResult.Status.ACCEPTED).count(); - - log.debug("{} deleted {}/{} opids out of {} tablets", FateTxId.formatTid(tid), - deletesAccepted, opsDeleted, count); - - manager.getEventCoordinator().event(range, "Merge or deleterows completed %s", - FateTxId.formatTid(tid)); + log.debug("{} deleted {}/{} opids out of {} tablets", FateTxId.formatTid(tid), + acceptedCount.get(), submitted, count); - DeleteRows.verifyAccepted(results, FateTxId.formatTid(tid)); - Preconditions.checkState(deletesAccepted == opsDeleted); - } + manager.getEventCoordinator().event(range, "Merge or deleterows completed %s", + FateTxId.formatTid(tid)); - Utils.unreserveTable(manager, data.tableId, tid, true); - Utils.unreserveNamespace(manager, data.namespaceId, tid, false); - return null; + Preconditions.checkState(acceptedCount.get() == submitted && rejectedCount.get() == 0, + "Failed to delete tablets accepted:%s != %s rejected:%s", acceptedCount.get(), submitted, + rejectedCount.get()); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java index ddb1616f80..5e89a58d20 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java @@ -38,7 +38,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.admin.TabletHostingGoal; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; @@ -77,11 +76,6 @@ public class MergeTablets extends ManagerRepo { @Override public Repo<Manager> call(long tid, Manager manager) throws Exception { - mergeMetadataRecords(manager, tid); - return new FinishTableRangeOp(data); - } - - private void mergeMetadataRecords(Manager manager, long tid) throws AccumuloException { var fateStr = FateTxId.formatTid(tid); KeyExtent range = data.getMergeExtent(); log.debug("{} Merging metadata for {}", fateStr, range); @@ -130,6 +124,11 @@ public class MergeTablets extends ManagerRepo { // queue all tablets dirs except the last tablets to be added as GC candidates dirs.add(new AllVolumesDirectory(range.tableId(), tabletMeta.getDirName())); + if (dirs.size() > 1000) { + Preconditions.checkState(tabletsSeen > 1); + manager.getContext().getAmple().putGcFileAndDirCandidates(range.tableId(), dirs); + dirs.clear(); + } } } @@ -137,7 +136,7 @@ public class MergeTablets extends ManagerRepo { // The merge range overlaps a single tablet, so there is nothing to do. This could be // because there was only a single tablet before merge started or this operation completed // but the process died and now its running a 2nd time. - return; + return new FinishTableRangeOp(data); } Preconditions.checkState(lastTabletMeta != null, "%s no tablets seen in range %s", opid, @@ -190,35 +189,7 @@ public class MergeTablets extends ManagerRepo { // Accumulo GC will delete the dir manager.getContext().getAmple().putGcFileAndDirCandidates(range.tableId(), dirs); - // delete tablets - try ( - var tabletsMetadata = - manager.getContext().getAmple().readTablets().forTable(range.tableId()) - .overlapping(range.prevEndRow(), range.endRow()).saveKeyValues().build(); - var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { - - for (var tabletMeta : tabletsMetadata) { - validateTablet(tabletMeta, fateStr, opid, data.tableId); - - // do not delete the last tablet - if (Objects.equals(tabletMeta.getExtent().endRow(), lastTabletMeta.getExtent().endRow())) { - break; - } - - var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent()) - .requireOperation(opid).requireAbsentLocation(); - - tabletMeta.getKeyValues().keySet().forEach(key -> { - log.debug("{} deleting {}", fateStr, key); - }); - - tabletMutator.deleteAll(tabletMeta.getKeyValues().keySet()); - // if the tablet no longer exists, then it was successful - tabletMutator.submit(Ample.RejectionHandler.acceptAbsentTablet()); - } - - verifyAccepted(tabletsMutator.process(), fateStr); - } + return new DeleteTablets(data, lastTabletMeta.getEndRow()); } static void validateTablet(TabletMetadata tabletMeta, String fateStr, TabletOperationId opid, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java index 87f319f633..3a4e742d7c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java @@ -23,8 +23,13 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; @@ -53,20 +58,27 @@ public class ReserveTablets extends ManagerRepo { log.debug("{} reserving tablets in range {}", FateTxId.formatTid(tid), range); var opid = TabletOperationId.from(TabletOperationType.MERGING, tid); + AtomicLong opsAccepted = new AtomicLong(0); + BiConsumer<KeyExtent,Ample.ConditionalResult> resultConsumer = (extent, result) -> { + if (result.getStatus() == Status.ACCEPTED) { + opsAccepted.incrementAndGet(); + } + }; + + int count = 0; + int otherOps = 0; + int opsSet = 0; + int locations = 0; + int wals = 0; + try ( var tablets = env.getContext().getAmple().readTablets().forTable(data.tableId) .overlapping(range.prevEndRow(), range.endRow()).fetch(PREV_ROW, LOCATION, LOGS, OPID) .checkConsistency().build(); - var tabletsMutator = env.getContext().getAmple().conditionallyMutateTablets();) { - - int count = 0; - int otherOps = 0; - int opsSet = 0; - int locations = 0; - int wals = 0; + var tabletsMutator = + env.getContext().getAmple().conditionallyMutateTablets(resultConsumer)) { for (var tabletMeta : tablets) { - if (tabletMeta.getOperationId() == null) { tabletsMutator.mutateTablet(tabletMeta.getExtent()).requireAbsentOperation() .putOperation(opid).submit(tm -> opid.equals(tm.getOperationId())); @@ -83,46 +95,39 @@ public class ReserveTablets extends ManagerRepo { count++; } + } - var opsAccepted = tabletsMutator.process().values().stream() - .filter(conditionalResult -> conditionalResult.getStatus() == Status.ACCEPTED).count(); - - log.debug( - "{} reserve tablets op:{} count:{} other opids:{} opids set:{} locations:{} accepted:{} wals:{}", - FateTxId.formatTid(tid), data.op, count, otherOps, opsSet, locations, opsAccepted, wals); - - // while there are table lock a tablet can be concurrently deleted, so should always see - // tablets - Preconditions.checkState(count > 0); + log.debug( + "{} reserve tablets op:{} count:{} other opids:{} opids set:{} locations:{} accepted:{} wals:{}", + FateTxId.formatTid(tid), data.op, count, otherOps, opsSet, locations, opsAccepted, wals); - if (locations > 0 && opsAccepted > 0) { - // operation ids were set and tablets have locations, so lets send a signal to get them - // unassigned - env.getEventCoordinator().event(range, "Tablets %d were reserved for merge %s", opsAccepted, - FateTxId.formatTid(tid)); - } + // while there are table lock a tablet can be concurrently deleted, so should always see + // tablets + Preconditions.checkState(count > 0); - if (locations > 0 || otherOps > 0 || wals > 0) { - // need to wait on these tablets - return Math.max(1000, count); - } + if (locations > 0 && opsAccepted.get() > 0) { + // operation ids were set and tablets have locations, so lets send a signal to get them + // unassigned + env.getEventCoordinator().event(range, "Tablets %d were reserved for merge %s", + opsAccepted.get(), FateTxId.formatTid(tid)); + } - if (opsSet != opsAccepted) { - // not all operation ids were set - return Math.max(1000, count); - } + if (locations > 0 || otherOps > 0 || wals > 0) { + // need to wait on these tablets + return Math.min(Math.max(1000, count), 60000); + } - // operations ids were set on all tablets and no tablets have locations, so ready - return 0; + if (opsSet != opsAccepted.get()) { + // not all operation ids were set + return Math.min(Math.max(1000, count), 60000); } + + // operations ids were set on all tablets and no tablets have locations, so ready + return 0; } @Override public Repo<Manager> call(long tid, Manager environment) throws Exception { - if (data.op == MergeInfo.Operation.MERGE) { - return new MergeTablets(data); - } else { - return new DeleteRows(data); - } + return new CountFiles(data); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java new file mode 100644 index 0000000000..6d6e7c9e78 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.merge; + +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UnreserveAndError extends ManagerRepo { + private static final long serialVersionUID = 1L; + private static final Logger log = LoggerFactory.getLogger(UnreserveAndError.class); + private final MergeInfo mergeInfo; + private final long totalFiles; + private final long maxFiles; + + public UnreserveAndError(MergeInfo mergeInfo, long totalFiles, long maxFiles) { + this.mergeInfo = mergeInfo; + this.totalFiles = totalFiles; + this.maxFiles = maxFiles; + } + + @Override + public Repo<Manager> call(long tid, Manager environment) throws Exception { + FinishTableRangeOp.removeOperationIds(log, mergeInfo, tid, environment); + throw new AcceptableThriftTableOperationException(mergeInfo.tableId.toString(), null, + mergeInfo.op == MergeInfo.Operation.MERGE ? TableOperation.MERGE + : TableOperation.DELETE_RANGE, + TableOperationExceptionType.OTHER, + "Aborted merge because it would produce a tablets with more files than the configured limit of " + + maxFiles + ". Observed " + totalFiles + " files in the merge range."); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsIT.java index fc8dc9fb3f..62be26a053 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsIT.java @@ -20,28 +20,38 @@ package org.apache.accumulo.test.functional; import static org.apache.accumulo.test.util.FileMetadataUtil.printAndVerifyFileMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.SortedSet; import java.util.TreeSet; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -69,6 +79,86 @@ public class DeleteRowsIT extends AccumuloClusterHarness { return Duration.ofMinutes(5); } + @Test + public void tooManyFilesDeleteRowsTest() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName, + new NewTableConfiguration().setProperties(Map.of(Property.TABLE_MAJC_RATIO.getKey(), + "20000", Property.TABLE_MERGE_FILE_MAX.getKey(), "500"))); + + c.tableOperations().addSplits(tableName, + IntStream.range(1, 100).map(i -> i * 100).mapToObj(i -> String.format("%06d", i)) + .map(Text::new).collect(Collectors.toCollection(TreeSet::new))); + + // add two bogus files to each tablet, creating 40K file entries + c.tableOperations().offline(tableName, true); + try ( + var tablets = getServerContext().getAmple().readTablets() + .forTable(getServerContext().getTableId(tableName)).build(); + var tabletsMutator = getServerContext().getAmple().mutateTablets()) { + int fc = 0; + for (var tabletMeta : tablets) { + // add 500 files to each tablet + var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent()); + for (int i = 0; i < 500; i++) { + StoredTabletFile f1 = StoredTabletFile.of(new Path( + "file:///accumulo/tables/1/" + tabletMeta.getDirName() + "/F" + fc++ + ".rf")); + DataFileValue dfv1 = new DataFileValue(4200, 42); + tabletMutator.putFile(f1, dfv1); + } + + tabletMutator.mutate(); + } + } + c.tableOperations().online(tableName, true); + + // table should now have 50000 files total + try (var tablets = getServerContext().getAmple().readTablets() + .forTable(getServerContext().getTableId(tableName)).build()) { + assertEquals(50000, + tablets.stream().mapToInt(tabletMetadata -> tabletMetadata.getFiles().size()).sum()); + } + + // should fail to merge because there are too many files in the merge range + var exception = + assertThrows(AccumuloException.class, () -> c.tableOperations().deleteRows(tableName, + new Text(String.format("%06d", 55)), new Text(String.format("%06d", 9907)))); + // message should contain the observed number of files that would be merged. Some tablets + // would be completely merged away so their files are not counted. + assertTrue(exception.getMessage().contains("1000")); + // message should contain the max files limit it saw + assertTrue(exception.getMessage().contains("500")); + + assertEquals(99, c.tableOperations().listSplits(tableName).size()); + + c.tableOperations().setProperty(tableName, Property.TABLE_MERGE_FILE_MAX.getKey(), "2000"); + + // with the higher merge file setting, the delete should be able to go through. The table has + // a total of 50K files, however only 1000 files should end up in the merged tablet because + // most tablets fall completely within the delete range. + Wait.waitFor(() -> { + try { + c.tableOperations().deleteRows(tableName, new Text(String.format("%06d", 55)), + new Text(String.format("%06d", 9907))); + return true; + } catch (AccumuloException e) { + // The property value has not updated in the manager yet. + assertTrue(e.getMessage().contains("500")); + return false; + } + }); + + assertEquals(1, c.tableOperations().listSplits(tableName).size()); + // table should now have 1000 files total + try (var tablets = getServerContext().getAmple().readTablets() + .forTable(getServerContext().getTableId(tableName)).build()) { + assertEquals(1000, + tablets.stream().mapToInt(tabletMetadata -> tabletMetadata.getFiles().size()).sum()); + } + } + } + @Test public void testDeleteAllRows() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java index 94e5927927..a538d60bba 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java @@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.test.util.FileMetadataUtil.printAndVerifyFileMetadata; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -34,6 +35,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.SortedSet; import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -64,6 +67,7 @@ import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.TestIngest.IngestParams; import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -88,6 +92,68 @@ public class MergeIT extends AccumuloClusterHarness { return Duration.ofMinutes(8); } + @Test + public void tooManyFilesMergeTest() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName, + new NewTableConfiguration().setProperties(Map.of(Property.TABLE_MAJC_RATIO.getKey(), + "20000", Property.TABLE_MERGE_FILE_MAX.getKey(), "12345"))); + + c.tableOperations().addSplits(tableName, + IntStream.range(1, 10001).mapToObj(i -> String.format("%06d", i)).map(Text::new) + .collect(Collectors.toCollection(TreeSet::new))); + c.tableOperations().addSplits(tableName, + IntStream.range(10001, 20001).mapToObj(i -> String.format("%06d", i)).map(Text::new) + .collect(Collectors.toCollection(TreeSet::new))); + + // add two bogus files to each tablet, creating 40K file entries + c.tableOperations().offline(tableName, true); + try ( + var tablets = getServerContext().getAmple().readTablets() + .forTable(getServerContext().getTableId(tableName)).build(); + var mutator = getServerContext().getAmple().mutateTablets()) { + int fc = 0; + for (var tabletMeta : tablets) { + StoredTabletFile f1 = StoredTabletFile.of(new Path( + "file:///accumulo/tables/1/" + tabletMeta.getDirName() + "/F" + fc++ + ".rf")); + StoredTabletFile f2 = StoredTabletFile.of(new Path( + "file:///accumulo/tables/1/" + tabletMeta.getDirName() + "/F" + fc++ + ".rf")); + DataFileValue dfv1 = new DataFileValue(4200, 42); + DataFileValue dfv2 = new DataFileValue(4200, 42); + mutator.mutateTablet(tabletMeta.getExtent()).putFile(f1, dfv1).putFile(f2, dfv2).mutate(); + } + } + c.tableOperations().online(tableName, true); + + // should fail to merge because there are too many files in the merge range + var exception = assertThrows(AccumuloException.class, + () -> c.tableOperations().merge(tableName, null, null)); + // message should contain the observed number of files + assertTrue(exception.getMessage().contains("40002")); + // message should contain the max files limit it saw + assertTrue(exception.getMessage().contains("12345")); + + assertEquals(20000, c.tableOperations().listSplits(tableName).size()); + + // attempt to merge smaller ranges with less files, should work.. want to make sure the + // aborted merge did not leave the table in a bad state + Text prev = null; + for (int i = 1000; i <= 20000; i += 1000) { + Text end = new Text(String.format("%06d", i)); + c.tableOperations().merge(tableName, prev, end); + prev = end; + } + + assertEquals(20, c.tableOperations().listSplits(tableName).size()); + try (var tablets = getServerContext().getAmple().readTablets() + .forTable(getServerContext().getTableId(tableName)).build()) { + assertEquals(40002, + tablets.stream().mapToInt(tabletMetadata -> tabletMetadata.getFiles().size()).sum()); + } + } + } + @Test public void merge() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java index 84ab0d0482..3bd25e4733 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java @@ -29,6 +29,7 @@ import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.CloneConfiguration; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.harness.AccumuloClusterHarness; @@ -95,14 +96,7 @@ public class SplitMillionIT extends AccumuloClusterHarness { } long t3 = System.currentTimeMillis(); - - try (var scanner = c.createScanner(tableName)) { - scanner.setRange(new Range(row)); - Map<String,String> coords = scanner.stream().collect(Collectors.toMap( - e -> e.getKey().getColumnQualifier().toString(), e -> e.getValue().toString())); - assertEquals(Map.of("x", "200", "y", "900", "z", "300"), coords); - } - + verifyRow(c, tableName, row); long t4 = System.currentTimeMillis(); log.info("Row: {} scan1: {}ms write: {}ms scan2: {}ms", row, t2 - t1, t3 - t2, t4 - t3); } @@ -113,11 +107,39 @@ public class SplitMillionIT extends AccumuloClusterHarness { assertEquals(1_000_000, count); log.info("Time to scan all tablets : {}ms", t2 - t1); + // clone the table to test cloning with lots of tablets and also to give merge its own table + // to work on + var cloneName = tableName + "_clone"; + t1 = System.currentTimeMillis(); + c.tableOperations().clone(tableName, cloneName, CloneConfiguration.builder().build()); + t2 = System.currentTimeMillis(); + log.info("Time to clone table : {}ms", t2 - t1); + + // merge the clone, so that delete table can run later on tablet with lots and lots of tablets + t1 = System.currentTimeMillis(); + c.tableOperations().merge(cloneName, null, null); + t2 = System.currentTimeMillis(); + log.info("Time to merge all tablets : {}ms", t2 - t1); + + // verify data after merge + for (var rowInt : rows) { + var row = String.format("%010d", rowInt); + verifyRow(c, cloneName, row); + } + t1 = System.currentTimeMillis(); c.tableOperations().delete(tableName); t2 = System.currentTimeMillis(); log.info("Time to delete table : {}ms", t2 - t1); + } + } + private void verifyRow(AccumuloClient c, String tableName, String row) throws Exception { + try (var scanner = c.createScanner(tableName)) { + scanner.setRange(new Range(row)); + Map<String,String> coords = scanner.stream().collect(Collectors + .toMap(e -> e.getKey().getColumnQualifier().toString(), e -> e.getValue().toString())); + assertEquals(Map.of("x", "200", "y", "900", "z", "300"), coords); } }