This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 359712e526 Process seeding of split fate operations in batches (#5404) 359712e526 is described below commit 359712e526b4760862574adcaa6c178c853662dc Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Fri Mar 21 09:18:40 2025 -0400 Process seeding of split fate operations in batches (#5404) Updates the Seeder in the Manager that handles seeding split fate ops to use a single thread and to submit multiple outstanding operations to be seeded together instead of individually in order to improve performance. The user fate store will now track outstanding fate operations and return a future for each pending operation that will be completed when the batch is submitted. This closes #5160 --- .../org/apache/accumulo/core/conf/Property.java | 4 - .../java/org/apache/accumulo/core/fate/Fate.java | 5 + .../org/apache/accumulo/core/fate/FateStore.java | 11 +- .../accumulo/core/fate/user/FateMutator.java | 3 + .../accumulo/core/fate/user/FateMutatorImpl.java | 5 + .../accumulo/core/fate/user/UserFateStore.java | 165 +++++++++++++++++---- .../java/org/apache/accumulo/manager/Manager.java | 2 +- .../accumulo/manager/TabletGroupWatcher.java | 3 +- .../accumulo/manager/split/SeedSplitTask.java | 55 ------- .../apache/accumulo/manager/split/Splitter.java | 102 +++++++++---- .../apache/accumulo/test/fate/FateStoreUtil.java | 3 - 11 files changed, 225 insertions(+), 133 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 9687d2b7f6..62ae03cf69 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -466,10 +466,6 @@ public enum Property { + "indefinitely. Default is 0 to block indefinitely. Only valid when tserver available " + "threshold is set greater than 0.", "1.10.0"), - MANAGER_SPLIT_WORKER_THREADS("manager.split.seed.threadpool.size", "8", PropertyType.COUNT, - "The number of threads used to seed fate split task, the actual split work is done by fate" - + " threads.", - "4.0.0"), MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size", "1M", PropertyType.MEMORY, "The data size of each resource groups compaction job priority queue. The memory size of " diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 93f4218119..60d3f427c0 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -55,6 +55,7 @@ import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationExcepti import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.FateStore.Seeder; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.logging.FateLogger; import org.apache.accumulo.core.manager.thrift.TFateOperation; @@ -539,6 +540,10 @@ public class Fate<T> { return store.create(); } + public Seeder<T> beginSeeding() { + return store.beginSeeding(); + } + public void seedTransaction(FateOperation fateOp, FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { try (var seeder = store.beginSeeding()) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index c7ec3b4e4c..3f5a8ec040 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -56,10 +56,6 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { * Attempts to seed a transaction with the given repo if it does not exist. A fateId will be * derived from the fateKey. If seeded, sets the following data for the fateId in the store. * - * TODO: Support completing futures later in close method The current version will always return - * with a CompleteableFuture that is already completed. Future version will process will - * complete in the close() method for the User store. - * * <ul> * <li>Set the fate op</li> * <li>Set the status to SUBMITTED</li> @@ -76,15 +72,12 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { CompletableFuture<Optional<FateId>> attemptToSeedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo, boolean autoCleanUp); - // TODO: Right now all implementations do nothing - // Eventually this would check the status of all added conditional mutations, - // retry unknown, and then close the conditional writer. @Override void close(); } - // Creates a conditional writer for the user fate store. For Zookeeper all this code will probably - // do the same thing its currently doing as zookeeper does not support multi-node operations. + // Creates a conditional writer for the user fate store. For Zookeeper this will be a no-op + // because currently zookeeper does not support multi-node operations. Seeder<T> beginSeeding(); /** diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java index 25fc8fdd47..0280dbf749 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.fate.user; +import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; @@ -101,4 +102,6 @@ public interface FateMutator<T> { Status tryMutate(); + ConditionalMutation getMutation(); + } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java index b742361ccf..bb33f6ea81 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java @@ -260,4 +260,9 @@ public class FateMutatorImpl<T> implements FateMutator<T> { throw new RuntimeException(e); } } + + @Override + public ConditionalMutation getMutation() { + return mutation; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 466c771d1e..2a6efbed0d 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -21,19 +21,26 @@ package org.apache.accumulo.core.fate.user; import java.io.IOException; import java.io.Serializable; import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -47,9 +54,11 @@ import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.FateKey.FateKeyType; import org.apache.accumulo.core.fate.ReadOnlyRepo; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.StackOverflowException; +import org.apache.accumulo.core.fate.user.FateMutator.Status; import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; @@ -57,6 +66,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -136,36 +146,14 @@ public class UserFateStore<T> extends AbstractFateStore<T> { @Override public Seeder<T> beginSeeding() { - // TODO: For now can handle seeding 1 transaction at a time so just process - // everything in attemptToSeedTransaction - // Part 2 of the changes in #5160 will allow multiple seeding attempts to be combined - // into one conditional mutation and we will need to track the pending operations - // and futures in a map - return new Seeder<T>() { - @Override - public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp, - FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { - return CompletableFuture - .completedFuture(seedTransaction(fateOp, fateKey, repo, autoCleanUp)); - } - - @Override - public void close() { - // TODO: This will be used in Part 2 of #5160 - } - }; + return new BatchSeeder(); } - private Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo, - boolean autoCleanUp) { - final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + private FateMutator<T> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, FateId fateId, + Repo<T> repo, boolean autoCleanUp) { Supplier<FateMutator<T>> mutatorFactory = () -> newMutator(fateId).requireAbsent() .putKey(fateKey).putCreateTime(System.currentTimeMillis()); - if (seedTransaction(mutatorFactory, fateKey + " " + fateId, fateOp, repo, autoCleanUp)) { - return Optional.of(fateId); - } else { - return Optional.empty(); - } + return buildMutator(mutatorFactory, fateOp, repo, autoCleanUp); } @Override @@ -176,16 +164,22 @@ public class UserFateStore<T> extends AbstractFateStore<T> { return seedTransaction(mutatorFactory, fateId.canonical(), fateOp, repo, autoCleanUp); } + private FateMutator<T> buildMutator(Supplier<FateMutator<T>> mutatorFactory, + Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) { + var mutator = mutatorFactory.get(); + mutator = + mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED); + if (autoCleanUp) { + mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp)); + } + return mutator; + } + private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, String logId, Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) { + var mutator = buildMutator(mutatorFactory, fateOp, repo, autoCleanUp); int maxAttempts = 5; for (int attempt = 0; attempt < maxAttempts; attempt++) { - var mutator = mutatorFactory.get(); - mutator = - mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED); - if (autoCleanUp) { - mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp)); - } var status = mutator.tryMutate(); if (status == FateMutator.Status.ACCEPTED) { // signal to the super class that a new fate transaction was seeded and is ready to run @@ -393,6 +387,113 @@ public class UserFateStore<T> extends AbstractFateStore<T> { return fateInstanceType; } + private class BatchSeeder implements Seeder<T> { + private final AtomicBoolean closed = new AtomicBoolean(false); + + private final Map<FateId,Pair<FateMutator<T>,CompletableFuture<Optional<FateId>>>> pending = + new HashMap<>(); + + @Override + public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp, + FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { + Preconditions.checkState(!closed.get(), "Can't attempt to seed with a closed seeder."); + + final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + // If not already submitted, add to the pending list and return the future + // or the existing future if duplicate. The pending map will store the mutator + // to be processed on close in a one batch. + return pending.computeIfAbsent(fateId, id -> { + FateMutator<T> mutator = seedTransaction(fateOp, fateKey, fateId, repo, autoCleanUp); + CompletableFuture<Optional<FateId>> future = new CompletableFuture<>(); + return new Pair<>(mutator, future); + }).getSecond(); + } + + @Override + public void close() { + closed.set(true); + + int maxAttempts = 5; + + // This loop will submit all the pending mutations as one batch + // to a conditional writer and any known results will be removed + // from the pending map. Unknown results will be re-attempted up + // to the maxAttempts count + for (int attempt = 0; attempt < maxAttempts && !pending.isEmpty(); attempt++) { + var currentResults = tryMutateBatch(); + for (Entry<FateId,ConditionalWriter.Status> result : currentResults.entrySet()) { + var fateId = result.getKey(); + var status = result.getValue(); + var future = pending.get(fateId).getSecond(); + switch (result.getValue()) { + case ACCEPTED: + seededTx(); + log.trace("Attempt to seed {} returned {}", fateId.canonical(), status); + // Complete the future with the fatId and remove from pending + future.complete(Optional.of(fateId)); + pending.remove(fateId); + break; + case REJECTED: + log.debug("Attempt to seed {} returned {}", fateId.canonical(), status); + // Rejected so complete with an empty optional and remove from pending + future.complete(Optional.empty()); + pending.remove(fateId); + break; + case UNKNOWN: + log.debug("Attempt to seed {} returned {} status, retrying", fateId.canonical(), + status); + // unknown, so don't remove from map so that we try again if still under + // max attempts + break; + default: + // do not expect other statuses + throw new IllegalStateException("Unhandled status for mutation " + status); + } + } + + if (!pending.isEmpty()) { + // At this point can not reliably determine if the unknown pending mutations were + // successful or not because no reservation was acquired. For example since no + // reservation was acquired it is possible that seeding was a success and something + // immediately picked it up and started operating on it and changing it. + // If scanning after that point can not conclude success or failure. Another situation + // is that maybe the fateId already existed in a seeded form prior to getting this + // unknown. + UtilWaitThread.sleep(250); + } + } + + // Any remaining will be UNKNOWN status, so complete the futures with an optional empty + pending.forEach((fateId, pair) -> { + pair.getSecond().complete(Optional.empty()); + log.warn("Repeatedly received unknown status when attempting to seed {}", + fateId.canonical()); + }); + } + + // Submit all the pending mutations to a single conditional writer + // as one batch and return the results for each mutation + private Map<FateId,ConditionalWriter.Status> tryMutateBatch() { + if (pending.isEmpty()) { + return Map.of(); + } + + final Map<FateId,ConditionalWriter.Status> resultsMap = new HashMap<>(); + try (ConditionalWriter writer = context.createConditionalWriter(tableName)) { + Iterator<ConditionalWriter.Result> results = writer + .write(pending.values().stream().map(pair -> pair.getFirst().getMutation()).iterator()); + while (results.hasNext()) { + var result = results.next(); + var row = new Text(result.getMutation().getRow()); + resultsMap.put(FateId.from(FateInstanceType.USER, row.toString()), result.getStatus()); + } + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + throw new IllegalStateException(e); + } + return resultsMap; + } + } + private class FateTxStoreImpl extends AbstractFateTxStoreImpl { private FateTxStoreImpl(FateId fateId) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 5da833ee84..94ac0e76b5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1337,7 +1337,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, // Don't call start the CompactionCoordinator until we have tservers and upgrade is complete. compactionCoordinator.start(); - this.splitter = new Splitter(context); + this.splitter = new Splitter(this); this.splitter.start(); try { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index e29413e82a..fe39e60381 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -75,7 +75,6 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; import org.apache.accumulo.manager.metrics.ManagerMetrics; -import org.apache.accumulo.manager.split.SeedSplitTask; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.state.TableStats; import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; @@ -607,7 +606,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { final boolean needsSplit = actions.contains(ManagementAction.NEEDS_SPLITTING); if (needsSplit) { LOG.debug("{} may need splitting.", tm.getExtent()); - manager.getSplitter().initiateSplit(new SeedSplitTask(manager, tm.getExtent())); + manager.getSplitter().initiateSplit(tm.getExtent()); } if (actions.contains(ManagementAction.NEEDS_COMPACTING) && compactionGenerator != null) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java deleted file mode 100644 index 8270bc423f..0000000000 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.split; - -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.Fate; -import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.FateKey; -import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.split.FindSplits; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SeedSplitTask implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(SeedSplitTask.class); - private final Manager manager; - private final KeyExtent extent; - - public SeedSplitTask(Manager manager, KeyExtent extent) { - this.manager = manager; - this.extent = extent; - } - - @Override - public void run() { - try { - var fateInstanceType = FateInstanceType.fromTableId((extent.tableId())); - manager.fate(fateInstanceType).seedTransaction(Fate.FateOperation.SYSTEM_SPLIT, - FateKey.forSplit(extent), new FindSplits(extent), true); - } catch (Exception e) { - log.error("Failed to split {}", extent, e); - } - } - - public KeyExtent getExtent() { - return extent; - } -} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java index 85b841d1cf..d88e52ed66 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java @@ -18,22 +18,28 @@ */ package org.apache.accumulo.manager.split; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.split.FindSplits; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.hadoop.fs.FileSystem; @@ -49,9 +55,64 @@ public class Splitter { private static final Logger LOG = LoggerFactory.getLogger(Splitter.class); + private final Manager manager; private final ThreadPoolExecutor splitExecutor; // tracks which tablets are queued in splitExecutor - private final Set<Text> queuedTablets = ConcurrentHashMap.newKeySet(); + private final Map<Text,KeyExtent> queuedTablets = new ConcurrentHashMap<>(); + + class SplitWorker implements Runnable { + + @Override + public void run() { + try { + while (manager.stillManager()) { + if (queuedTablets.isEmpty()) { + sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + continue; + } + + final Map<Text,KeyExtent> userSplits = new HashMap<>(); + final Map<Text,KeyExtent> metaSplits = new HashMap<>(); + + // Go through all the queued up splits and partition + // into the different store types to be submitted. + queuedTablets.forEach((metaRow, extent) -> { + switch (FateInstanceType.fromTableId((extent.tableId()))) { + case USER: + userSplits.put(metaRow, extent); + break; + case META: + metaSplits.put(metaRow, extent); + break; + default: + throw new IllegalStateException("Unexpected FateInstanceType"); + } + }); + + // see the user and then meta splits + // The meta plits (zk) will be processed one at a time but there will not be + // many of those splits. The user splits are processed as a batch. + seedSplits(FateInstanceType.USER, userSplits); + seedSplits(FateInstanceType.META, metaSplits); + } + } catch (Exception e) { + LOG.error("Failed to split", e); + } + } + } + + private void seedSplits(FateInstanceType instanceType, Map<Text,KeyExtent> splits) { + if (!splits.isEmpty()) { + try (var seeder = manager.fate(instanceType).beginSeeding()) { + for (KeyExtent extent : splits.values()) { + var unused = seeder.attemptToSeedTransaction(Fate.FateOperation.SYSTEM_SPLIT, + FateKey.forSplit(extent), new FindSplits(extent), true); + } + } finally { + queuedTablets.keySet().removeAll(splits.keySet()); + } + } + } public static class FileInfo { final Text firstRow; @@ -151,12 +212,12 @@ public class Splitter { final LoadingCache<CacheKey,FileInfo> splitFileCache; - public Splitter(ServerContext context) { - int numThreads = context.getConfiguration().getCount(Property.MANAGER_SPLIT_WORKER_THREADS); + public Splitter(Manager manager) { + this.manager = manager; + ServerContext context = manager.getContext(); - this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder") - .numCoreThreads(numThreads).numMaxThreads(numThreads).withTimeOut(0L, TimeUnit.MILLISECONDS) - .enableThreadPoolMetrics().build(); + this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder").numCoreThreads(1) + .numMaxThreads(1).withTimeOut(0L, TimeUnit.MILLISECONDS).enableThreadPoolMetrics().build(); Weigher<CacheKey, FileInfo> weigher = (key, info) -> key.tableId.canonical().length() @@ -175,7 +236,9 @@ public class Splitter { } - public synchronized void start() {} + public synchronized void start() { + splitExecutor.execute(new SplitWorker()); + } public synchronized void stop() { splitExecutor.shutdownNow(); @@ -185,29 +248,14 @@ public class Splitter { return splitFileCache.get(new CacheKey(tableId, tabletFile)); } - public void initiateSplit(SeedSplitTask seedSplitTask) { + public void initiateSplit(KeyExtent extent) { // Want to avoid queuing the same tablet multiple times, it would not cause bugs but would waste // work. Use the metadata row to identify a tablet because the KeyExtent also includes the prev // end row which may change when splits happen. The metaRow is conceptually tableId+endRow and // that does not change for a split. - Text metaRow = seedSplitTask.getExtent().toMetaRow(); + Text metaRow = extent.toMetaRow(); int qsize = queuedTablets.size(); - if (qsize < 10_000 && queuedTablets.add(metaRow)) { - Runnable taskWrapper = () -> { - try { - seedSplitTask.run(); - } finally { - queuedTablets.remove(metaRow); - } - }; - - try { - splitExecutor.execute(taskWrapper); - } catch (RejectedExecutionException rje) { - queuedTablets.remove(metaRow); - throw rje; - } - } else { + if (qsize >= 10_000 || queuedTablets.putIfAbsent(metaRow, extent) != null) { LOG.trace("Did not add {} to split queue {}", metaRow, qsize); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java index 61ea073a6f..5b12b0f3cd 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java @@ -77,9 +77,6 @@ public class FateStoreUtil { assertEquals(fateTableProps, testFateTableProps); } - // For now just process one at a time as the current impl completes - // each seed transaction individually. In future versions we can test - // batching multiple seeding atempts together. public static <T> Optional<FateId> seedTransaction(FateStore<T> store, Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { CompletableFuture<Optional<FateId>> fateIdFuture;