This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 7f748e7a4c adds support for compacting lots of tablets (#3945) 7f748e7a4c is described below commit 7f748e7a4c7fdb2ddc7bfeae4ea0932d086ea829 Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon Nov 27 17:48:41 2023 -0500 adds support for compacting lots of tablets (#3945) Removes buffering of all conditional results in memory in the compaction code. This allows compacting more tablets than would fit in memory. Changed the condition result BiConsumer to a Consumer because it was passing the extents twice, so a BiConsumer was not needed and made the code more verbose. Updated the compaction code to collect stats and log a trace. It was logging a lot of per tablet information at debug. When compacting one million tablets, this resulted in a lot of information in the manager logs. Moved the per tablet information to trace logging. Added collection of stats for the different per tablet information and logged the stats once for all tablets scanned. Added compaction to the SplitMillionIT. Without the other changes in this PR adding compaction to the SplitMillion would cause the Manager to die with an out of memory error because the conditional write was buffering all 1 million tablets. --- .../accumulo/core/metadata/schema/Ample.java | 6 +- .../AsyncConditionalTabletsMutatorImpl.java | 12 +- .../accumulo/server/metadata/ServerAmpleImpl.java | 3 +- .../accumulo/manager/tableOps/compact/CleanUp.java | 41 +++++-- .../manager/tableOps/compact/CompactionDriver.java | 136 +++++++++++++-------- .../manager/tableOps/delete/ReserveTablets.java | 7 +- .../manager/tableOps/merge/DeleteTablets.java | 6 +- .../manager/tableOps/merge/FinishTableRangeOp.java | 6 +- .../manager/tableOps/merge/ReserveTablets.java | 5 +- .../test/functional/AmpleConditionalWriterIT.java | 4 +- .../accumulo/test/functional/SplitMillionIT.java | 69 +++++++++-- 11 files changed, 194 insertions(+), 101 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 74c493236b..d9bd1b0f07 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -24,7 +24,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Stream; @@ -213,7 +213,7 @@ public interface Ample { * An entry point for updating tablets metadata using a conditional writer. The returned mutator * will buffer everything in memory until {@link ConditionalTabletsMutator#process()} is called. * If buffering everything in memory is undesirable, then consider using - * {@link #conditionallyMutateTablets(BiConsumer)} + * {@link #conditionallyMutateTablets(Consumer)} * * @see ConditionalTabletMutator#submit(RejectionHandler) */ @@ -237,7 +237,7 @@ public interface Ample { * @see ConditionalTabletMutator#submit(RejectionHandler) */ default AsyncConditionalTabletsMutator - conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) { + conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) { throw new UnsupportedOperationException(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java index 0d31f71eef..d7666361f7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java @@ -24,7 +24,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample; @@ -32,7 +32,7 @@ import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.ServerContext; public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditionalTabletsMutator { - private final BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer; + private final Consumer<Ample.ConditionalResult> resultsConsumer; private final ExecutorService executor; private Future<Map<KeyExtent,Ample.ConditionalResult>> backgroundProcessing = null; private ConditionalTabletsMutatorImpl bufferingMutator; @@ -41,7 +41,7 @@ public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditiona public static final int BATCH_SIZE = 1000; AsyncConditionalTabletsMutatorImpl(ServerContext context, - BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer) { + Consumer<Ample.ConditionalResult> resultsConsumer) { this.resultsConsumer = Objects.requireNonNull(resultsConsumer); this.bufferingMutator = new ConditionalTabletsMutatorImpl(context); this.context = context; @@ -58,7 +58,7 @@ public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditiona if (backgroundProcessing != null) { // a previous batch of mutations was submitted for processing so wait on it. try { - backgroundProcessing.get().forEach(resultsConsumer); + backgroundProcessing.get().values().forEach(resultsConsumer); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException(e); } @@ -85,13 +85,13 @@ public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditiona if (backgroundProcessing != null) { // a previous batch of mutations was submitted for processing so wait on it. try { - backgroundProcessing.get().forEach(resultsConsumer); + backgroundProcessing.get().values().forEach(resultsConsumer); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException(e); } } // process anything not processed so far - bufferingMutator.process().forEach(resultsConsumer); + bufferingMutator.process().values().forEach(resultsConsumer); bufferingMutator.close(); executor.shutdownNow(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index a42bc5f488..d0c1dd71e3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -28,7 +28,6 @@ import java.util.Iterator; import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -96,7 +95,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { @Override public AsyncConditionalTabletsMutator - conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) { + conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) { return new AsyncConditionalTabletsMutatorImpl(context, resultsConsumer); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java index 9b864e3f3f..e1c6e1f504 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java @@ -22,11 +22,14 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.manager.Manager; @@ -59,32 +62,46 @@ public class CleanUp extends ManagerRepo { var ample = manager.getContext().getAmple(); + var fateStr = FateTxId.formatTid(tid); + + AtomicLong rejectedCount = new AtomicLong(0); + Consumer<Ample.ConditionalResult> resultConsumer = result -> { + if (result.getStatus() == Status.REJECTED) { + log.debug("{} update for {} was rejected ", fateStr, result.getExtent()); + rejectedCount.incrementAndGet(); + } + }; + + long t1, t2, submitted = 0, total = 0; + try ( var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) .fetch(PREV_ROW, COMPACTED).checkConsistency().build(); - var tabletsMutator = ample.conditionallyMutateTablets()) { + var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { - long t1 = System.nanoTime(); + t1 = System.nanoTime(); for (TabletMetadata tablet : tablets) { + total++; if (tablet.getCompacted().contains(tid)) { tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, COMPACTED).deleteCompacted(tid) .submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(tid)); + submitted++; } } - long rejected = tabletsMutator.process().values().stream() - .filter(result -> result.getStatus() == Status.REJECTED).peek(result -> log - .debug("{} update for {} was rejected ", FateTxId.formatTid(tid), result.getExtent())) - .count(); + t2 = System.nanoTime(); + } - long t2 = System.nanoTime(); + long scanTime = Duration.ofNanos(t2 - t1).toMillis(); - if (rejected > 0) { - long sleepTime = Duration.ofNanos(t2 - t1).toMillis(); - sleepTime = Math.max(100, Math.min(30000, sleepTime * 2)); - return sleepTime; - } + log.debug("{} removed {} of {} compacted markers for {} tablets in {}ms", fateStr, + submitted - rejectedCount.get(), submitted, total, scanTime); + + if (rejectedCount.get() > 0) { + long sleepTime = scanTime; + sleepTime = Math.max(100, Math.min(30000, sleepTime * 2)); + return sleepTime; } return 0; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 21d0f05902..6fd1a969aa 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -29,6 +29,8 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -46,6 +48,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.AbstractTabletFile; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -137,18 +140,33 @@ class CompactionDriver extends ManagerRepo { // ELASTICITY_TODO use existing compaction logging - try ( - var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) - .fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID).checkConsistency().build(); - var tabletsMutator = ample.conditionallyMutateTablets()) { + var fateStr = FateTxId.formatTid(tid); + + Consumer<Ample.ConditionalResult> resultConsumer = result -> { + if (result.getStatus() == Status.REJECTED) { + log.debug("{} update for {} was rejected ", fateStr, result.getExtent()); + } + }; - int complete = 0; - int total = 0; + long t1 = System.currentTimeMillis(); - int selected = 0; + int complete = 0; + int total = 0; + int opidsSeen = 0; + int noFiles = 0; + int noneSelected = 0; + int alreadySelected = 0; + int otherSelected = 0; + int otherCompaction = 0; + int selected = 0; - KeyExtent minSelected = null; - KeyExtent maxSelected = null; + KeyExtent minSelected = null; + KeyExtent maxSelected = null; + + try ( + var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) + .fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID).checkConsistency().build(); + var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { CompactionConfig config = CompactionConfigStorage.getConfig(manager.getContext(), tid); @@ -156,53 +174,54 @@ class CompactionDriver extends ManagerRepo { total++; - // TODO change all logging to trace - if (tablet.getCompacted().contains(tid)) { // this tablet is already considered done - log.debug("{} compaction for {} is complete", FateTxId.formatTid(tid), - tablet.getExtent()); + log.trace("{} compaction for {} is complete", fateStr, tablet.getExtent()); complete++; } else if (tablet.getOperationId() != null) { - log.debug("{} ignoring tablet {} with active operation {} ", FateTxId.formatTid(tid), - tablet.getExtent(), tablet.getOperationId()); + log.trace("{} ignoring tablet {} with active operation {} ", fateStr, tablet.getExtent(), + tablet.getOperationId()); + opidsSeen++; } else if (tablet.getFiles().isEmpty()) { - log.debug("{} tablet {} has no files, attempting to mark as compacted ", - FateTxId.formatTid(tid), tablet.getExtent()); + log.trace("{} tablet {} has no files, attempting to mark as compacted ", fateStr, + tablet.getExtent()); // this tablet has no files try to mark it as done tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, COMPACTED).putCompacted(tid) .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid)); + noFiles++; } else if (tablet.getSelectedFiles() == null && tablet.getExternalCompactions().isEmpty()) { // there are no selected files - log.debug("{} selecting {} files compaction for {}", FateTxId.formatTid(tid), - tablet.getFiles().size(), tablet.getExtent()); + log.trace("{} selecting {} files compaction for {}", fateStr, tablet.getFiles().size(), + tablet.getExtent()); Set<StoredTabletFile> filesToCompact; try { filesToCompact = CompactionPluginUtils.selectFiles(manager.getContext(), tablet.getExtent(), config, tablet.getFilesMap()); } catch (Exception e) { - log.warn("{} failed to select files for {} using {}", FateTxId.formatTid(tid), - tablet.getExtent(), config.getSelector(), e); + log.warn("{} failed to select files for {} using {}", fateStr, tablet.getExtent(), + config.getSelector(), e); throw new AcceptableThriftTableOperationException(tableId.canonical(), null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Failed to select files"); } - // TODO expensive logging - log.debug("{} selected {} of {} files for {}", FateTxId.formatTid(tid), - filesToCompact.stream().map(AbstractTabletFile::getFileName) - .collect(Collectors.toList()), - tablet.getFiles().stream().map(AbstractTabletFile::getFileName) - .collect(Collectors.toList()), - tablet.getExtent()); - + if (log.isTraceEnabled()) { + log.trace("{} selected {} of {} files for {}", fateStr, + filesToCompact.stream().map(AbstractTabletFile::getFileName) + .collect(Collectors.toList()), + tablet.getFiles().stream().map(AbstractTabletFile::getFileName) + .collect(Collectors.toList()), + tablet.getExtent()); + } if (filesToCompact.isEmpty()) { // no files were selected so mark the tablet as compacted tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED).putCompacted(tid) .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid)); + + noneSelected++; } else { var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED); @@ -228,39 +247,42 @@ class CompactionDriver extends ManagerRepo { } else if (tablet.getSelectedFiles() != null) { if (tablet.getSelectedFiles().getFateTxId() == tid) { - log.debug( + log.trace( "{} tablet {} already has {} selected files for this compaction, waiting for them be processed", - FateTxId.formatTid(tid), tablet.getExtent(), - tablet.getSelectedFiles().getFiles().size()); + fateStr, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size()); + alreadySelected++; } else { - log.debug( + log.trace( "{} tablet {} already has {} selected files by another compaction {}, waiting for them be processed", - FateTxId.formatTid(tid), tablet.getExtent(), - tablet.getSelectedFiles().getFiles().size(), + fateStr, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size(), FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId())); + otherSelected++; } } else { // ELASTICITY_TODO if there are compactions preventing selection of files, then add // selecting marker that prevents new compactions from starting + otherCompaction++; } } + } catch (InterruptedException | KeeperException e) { + throw new RuntimeException(e); + } - tabletsMutator.process().values().stream() - .filter(result -> result.getStatus() == Status.REJECTED) - .forEach(result -> log.debug("{} update for {} was rejected ", FateTxId.formatTid(tid), - result.getExtent())); + long t2 = System.currentTimeMillis(); - if (selected > 0) { - manager.getEventCoordinator().event( - new KeyExtent(tableId, maxSelected.endRow(), minSelected.prevEndRow()), - "%s selected files for compaction for %d tablets", FateTxId.formatTid(tid), selected); - } + log.debug("{} tablet stats, total:{} complete:{} selected_now:{} selected_prev:{}" + + " selected_by_other:{} no_files:{} none_selected:{} other_compaction:{} opids:{} scan_update_time:{}ms", + fateStr, total, complete, selected, alreadySelected, otherSelected, noFiles, noneSelected, + otherCompaction, opidsSeen, t2 - t1); - return total - complete; - } catch (InterruptedException | KeeperException e) { - throw new RuntimeException(e); + if (selected > 0) { + manager.getEventCoordinator().event( + new KeyExtent(tableId, maxSelected.endRow(), minSelected.prevEndRow()), + "%s selected files for compaction for %d tablets", fateStr, selected); } + return total - complete; + // ELASTICITIY_TODO need to handle seeing zero tablets } @@ -293,12 +315,22 @@ class CompactionDriver extends ManagerRepo { .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) .logInterval(3, MINUTES).createRetry(); + var fateStr = FateTxId.formatTid(tid); + while (!allCleanedUp) { + AtomicLong rejectedCount = new AtomicLong(0); + Consumer<Ample.ConditionalResult> resultConsumer = result -> { + if (result.getStatus() == Status.REJECTED) { + log.debug("{} update for {} was rejected ", fateStr, result.getExtent()); + rejectedCount.incrementAndGet(); + } + }; + try ( var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) .fetch(PREV_ROW, COMPACTED, SELECTED).checkConsistency().build(); - var tabletsMutator = ample.conditionallyMutateTablets()) { + var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { Predicate<TabletMetadata> needsUpdate = tabletMetadata -> (tabletMetadata.getSelectedFiles() != null && tabletMetadata.getSelectedFiles().getFateTxId() == tid) @@ -322,14 +354,12 @@ class CompactionDriver extends ManagerRepo { mutator.submit(needsNoUpdate::test); } } - - allCleanedUp = tabletsMutator.process().values().stream() - .allMatch(result -> result.getStatus() == Status.ACCEPTED); } + allCleanedUp = rejectedCount.get() == 0; + if (!allCleanedUp) { - retry.waitForNextAttempt(log, - "Cleanup metadata for failed compaction " + FateTxId.formatTid(tid)); + retry.waitForNextAttempt(log, "Cleanup metadata for failed compaction " + fateStr); } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java index afdd5fc155..79ffc5e542 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java @@ -23,11 +23,10 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletOperationId; @@ -58,11 +57,11 @@ public class ReserveTablets extends ManagerRepo { // The consumer may be called in another thread so use an AtomicLong AtomicLong accepted = new AtomicLong(0); - BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer = (extent, result) -> { + Consumer<Ample.ConditionalResult> resultsConsumer = result -> { if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { accepted.incrementAndGet(); } else { - log.debug("Failed to set operation id {} {}", opid, extent); + log.debug("Failed to set operation id {} {}", opid, result.getExtent()); } }; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java index f34708681a..d34a8f8355 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java @@ -20,7 +20,7 @@ package org.apache.accumulo.manager.tableOps.merge; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; @@ -66,11 +66,11 @@ public class DeleteTablets extends ManagerRepo { AtomicLong acceptedCount = new AtomicLong(); AtomicLong rejectedCount = new AtomicLong(); // delete tablets - BiConsumer<KeyExtent,Ample.ConditionalResult> resultConsumer = (extent, result) -> { + Consumer<Ample.ConditionalResult> resultConsumer = result -> { if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { acceptedCount.incrementAndGet(); } else { - log.error("{} failed to update {}", fateStr, extent); + log.error("{} failed to update {}", fateStr, result.getExtent()); rejectedCount.incrementAndGet(); } }; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java index 9ae16c1ac4..c58efca885 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java @@ -23,7 +23,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; @@ -68,11 +68,11 @@ class FinishTableRangeOp extends ManagerRepo { AtomicLong acceptedCount = new AtomicLong(); AtomicLong rejectedCount = new AtomicLong(); // delete tablets - BiConsumer<KeyExtent,Ample.ConditionalResult> resultConsumer = (extent, result) -> { + Consumer<Ample.ConditionalResult> resultConsumer = result -> { if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { acceptedCount.incrementAndGet(); } else { - log.error("{} failed to update {}", fateStr, extent); + log.error("{} failed to update {}", fateStr, result.getExtent()); rejectedCount.incrementAndGet(); } }; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java index 3a4e742d7c..e2fd71e65b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java @@ -24,9 +24,8 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.schema.Ample; @@ -59,7 +58,7 @@ public class ReserveTablets extends ManagerRepo { var opid = TabletOperationId.from(TabletOperationType.MERGING, tid); AtomicLong opsAccepted = new AtomicLong(0); - BiConsumer<KeyExtent,Ample.ConditionalResult> resultConsumer = (extent, result) -> { + Consumer<Ample.ConditionalResult> resultConsumer = result -> { if (result.getStatus() == Status.ACCEPTED) { opsAccepted.incrementAndGet(); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index e81e6856e5..325970c03f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -49,7 +49,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -913,7 +913,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { AtomicLong accepted = new AtomicLong(0); AtomicLong total = new AtomicLong(0); - BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer = (extent, result) -> { + Consumer<Ample.ConditionalResult> resultsConsumer = result -> { if (result.getStatus() == Status.ACCEPTED) { accepted.incrementAndGet(); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java index 3bd25e4733..3ed25d330f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java @@ -20,6 +20,8 @@ package org.apache.accumulo.test.functional; import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.SortedSet; @@ -29,9 +31,14 @@ import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.CloneConfiguration; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; @@ -42,11 +49,20 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public class SplitMillionIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(SplitMillionIT.class); + + public static class XFilter extends Filter { + + @Override + public boolean accept(Key k, Value v) { + return !k.getColumnQualifierData().toString().equals("x"); + } + } + @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"}, justification = "predictable random is ok for testing") @Test public void testOneMillionTablets() throws Exception { - Logger log = LoggerFactory.getLogger(SplitIT.class); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { String tableName = getUniqueNames(1)[0]; @@ -96,7 +112,7 @@ public class SplitMillionIT extends AccumuloClusterHarness { } long t3 = System.currentTimeMillis(); - verifyRow(c, tableName, row); + verifyRow(c, tableName, row, Map.of("x", "200", "y", "900", "z", "300")); long t4 = System.currentTimeMillis(); log.info("Row: {} scan1: {}ms write: {}ms scan2: {}ms", row, t2 - t1, t3 - t2, t4 - t3); } @@ -105,7 +121,18 @@ public class SplitMillionIT extends AccumuloClusterHarness { long count = c.tableOperations().getTabletInformation(tableName, new Range()).count(); long t2 = System.currentTimeMillis(); assertEquals(1_000_000, count); - log.info("Time to scan all tablets : {}ms", t2 - t1); + log.info("Time to scan all tablets information : {}ms", t2 - t1); + + t1 = System.currentTimeMillis(); + var iterSetting = new IteratorSetting(100, XFilter.class); + c.tableOperations().compact(tableName, + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true).setFlush(true)); + t2 = System.currentTimeMillis(); + assertEquals(1_000_000, count); + log.info("Time to compact all tablets : {}ms", t2 - t1); + + var expected = Map.of("y", "900", "z", "300"); + vefifyData(rows, c, tableName, expected); // clone the table to test cloning with lots of tablets and also to give merge its own table // to work on @@ -114,6 +141,7 @@ public class SplitMillionIT extends AccumuloClusterHarness { c.tableOperations().clone(tableName, cloneName, CloneConfiguration.builder().build()); t2 = System.currentTimeMillis(); log.info("Time to clone table : {}ms", t2 - t1); + vefifyData(rows, c, cloneName, expected); // merge the clone, so that delete table can run later on tablet with lots and lots of tablets t1 = System.currentTimeMillis(); @@ -121,11 +149,7 @@ public class SplitMillionIT extends AccumuloClusterHarness { t2 = System.currentTimeMillis(); log.info("Time to merge all tablets : {}ms", t2 - t1); - // verify data after merge - for (var rowInt : rows) { - var row = String.format("%010d", rowInt); - verifyRow(c, cloneName, row); - } + vefifyData(rows, c, cloneName, expected); t1 = System.currentTimeMillis(); c.tableOperations().delete(tableName); @@ -134,12 +158,37 @@ public class SplitMillionIT extends AccumuloClusterHarness { } } - private void verifyRow(AccumuloClient c, String tableName, String row) throws Exception { + private void vefifyData(int[] rows, AccumuloClient c, String tableName, + Map<String,String> expected) throws Exception { + // use a batch scanner so that many hosting request can be submitted at the same time + long t1 = System.currentTimeMillis(); + try (var scanner = c.createBatchScanner(tableName)) { + var ranges = IntStream.of(rows).mapToObj(row -> String.format("%010d", row)).map(Range::new) + .collect(Collectors.toList()); + scanner.setRanges(ranges); + Map<String,Map<String,String>> allCoords = new HashMap<>(); + scanner.forEach((k, v) -> { + var row = k.getRowData().toString(); + var qual = k.getColumnQualifierData().toString(); + var val = v.toString(); + allCoords.computeIfAbsent(row, r -> new HashMap<>()).put(qual, val); + }); + + assertEquals(IntStream.of(rows).mapToObj(row -> String.format("%010d", row)) + .collect(Collectors.toSet()), allCoords.keySet()); + allCoords.values().forEach(coords -> assertEquals(expected, coords)); + } + long t2 = System.currentTimeMillis(); + log.info("Time to verify {} rows was {}ms", rows.length, t2 - t1); + } + + private void verifyRow(AccumuloClient c, String tableName, String row, + Map<String,String> expected) throws Exception { try (var scanner = c.createScanner(tableName)) { scanner.setRange(new Range(row)); Map<String,String> coords = scanner.stream().collect(Collectors .toMap(e -> e.getKey().getColumnQualifier().toString(), e -> e.getValue().toString())); - assertEquals(Map.of("x", "200", "y", "900", "z", "300"), coords); + assertEquals(expected, coords); } }