This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 5d8d9cd57f moves finding split points into FATE (#4178) 5d8d9cd57f is described below commit 5d8d9cd57ffcfdab21f42fb4b4a9dfa36a16cd26 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Feb 21 17:37:20 2024 -0500 moves finding split points into FATE (#4178) This commit makes the following changes 1. Moves finding split points from tablet from a custom thread pool in the manager into FATE. This is done by the new Repo named FindSplits. 2. Stops tracking what splits are running in manager memory and moves this into FATE. This is not implemented yet, plan to do that in another PR 3. Stops tracking what tablets are unsplittable in manager memory. This should be tracked in metadata table per #4177. This change can be in its own commit. In this commit the in memory tracking is simply removed. These changes will reduce manager memory usage. These changes also will make splitting tablets work much better if FATE is distributed for two reasons. First it allows the computation to spread over multiple manager processes. Second it allows tracking of what is splitting to move from memory to persisted storage and not rely on a map in a single process. --- .../java/org/apache/accumulo/core/fate/Fate.java | 58 +++++++--- .../apache/accumulo/core/util/cache/Caches.java | 1 - .../accumulo/core/util/threads/ThreadPools.java | 3 - .../accumulo/manager/TabletGroupWatcher.java | 14 +-- .../accumulo/manager/split/SeedSplitTask.java | 62 +++++++++++ .../apache/accumulo/manager/split/SplitTask.java | 98 ---------------- .../apache/accumulo/manager/split/Splitter.java | 122 ++++---------------- .../manager/tableOps/split/FindSplits.java | 87 +++++++++++++++ .../accumulo/manager/tableOps/split/PreSplit.java | 7 +- .../accumulo/manager/split/SplitterTest.java | 123 --------------------- 10 files changed, 214 insertions(+), 361 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 3828bb80c4..0b82f73f11 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -57,6 +57,8 @@ import org.apache.thrift.TApplicationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * Fault tolerant executor */ @@ -334,6 +336,44 @@ public class Fate<T> { return store.create(); } + public Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo, + boolean autoCleanUp, String goalMessage) { + + Optional<FateTxStore<T>> optTxStore = store.createAndReserve(fateKey); + + return optTxStore.map(txStore -> { + var fateId = txStore.getID(); + try { + Preconditions.checkState(txStore.getStatus() == NEW); + seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore); + } finally { + txStore.unreserve(0, MILLISECONDS); + } + return fateId; + }); + } + + private void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp, + String goalMessage, FateTxStore<T> txStore) { + if (txStore.top() == null) { + try { + log.info("Seeding {} {}", fateId, goalMessage); + txStore.push(repo); + } catch (StackOverflowException e) { + // this should not happen + throw new IllegalStateException(e); + } + } + + if (autoCleanUp) { + txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp); + } + + txStore.setTransactionInfo(TxInfo.TX_NAME, txName); + + txStore.setStatus(SUBMITTED); + } + // start work in the transaction.. it is safe to call this // multiple times for a transaction... but it will only seed once public void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp, @@ -341,23 +381,7 @@ public class Fate<T> { FateTxStore<T> txStore = store.reserve(fateId); try { if (txStore.getStatus() == NEW) { - if (txStore.top() == null) { - try { - log.info("Seeding {} {}", fateId, goalMessage); - txStore.push(repo); - } catch (StackOverflowException e) { - // this should not happen - throw new IllegalStateException(e); - } - } - - if (autoCleanUp) { - txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp); - } - - txStore.setTransactionInfo(TxInfo.TX_NAME, txName); - - txStore.setStatus(SUBMITTED); + seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore); } } finally { txStore.unreserve(0, TimeUnit.MILLISECONDS); diff --git a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java index 3c37b68067..39a96225d4 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java +++ b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java @@ -45,7 +45,6 @@ public class Caches implements MetricsProducer { COMPRESSION_ALGORITHM, CRYPT_PASSWORDS, HOST_REGEX_BALANCER_TABLE_REGEX, - HOSTING_REQUEST_CACHE, INSTANCE_ID, NAMESPACE_ID, PROP_CACHE, diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index cf364f8722..ad5aa4710b 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -280,9 +280,6 @@ public class ThreadPools { "summary partition", emitThreadPoolMetrics); case GC_DELETE_THREADS: return createFixedThreadPool(conf.getCount(p), "deleting", emitThreadPoolMetrics); - case MANAGER_SPLIT_WORKER_THREADS: - return createFixedThreadPool(conf.getCount(p), "tablet split inspection", - emitThreadPoolMetrics); default: throw new IllegalArgumentException("Unhandled thread pool property: " + p); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 9f1deac81e..e8bdd794a4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -73,7 +73,7 @@ import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; import org.apache.accumulo.manager.metrics.ManagerMetrics; -import org.apache.accumulo.manager.split.SplitTask; +import org.apache.accumulo.manager.split.SeedSplitTask; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.state.TableStats; import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; @@ -521,17 +521,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { if (actions.contains(ManagementAction.NEEDS_SPLITTING)) { LOG.debug("{} may need splitting.", tm.getExtent()); - if (manager.getSplitter().isSplittable(tm)) { - if (manager.getSplitter().addSplitStarting(tm.getExtent())) { - LOG.debug("submitting tablet {} for split", tm.getExtent()); - manager.getSplitter().executeSplit(new SplitTask(manager.getContext(), tm, manager)); - } - } else { - LOG.debug("{} is not splittable.", tm.getExtent()); - } - // ELASITICITY_TODO: See #3605. Merge is non-functional. Left this commented out code to - // show where merge used to make a call to split a tablet. - // sendSplitRequest(mergeStats.getMergeInfo(), state, tm); + manager.getSplitter().initiateSplit(new SeedSplitTask(manager, tm.getExtent())); } if (actions.contains(ManagementAction.NEEDS_COMPACTING)) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java new file mode 100644 index 0000000000..f63047ee54 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.split; + +import java.util.Optional; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.split.FindSplits; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SeedSplitTask implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(SeedSplitTask.class); + private final Manager manager; + private KeyExtent extent; + + public SeedSplitTask(Manager manager, KeyExtent extent) { + this.manager = manager; + this.extent = extent; + } + + @Override + public void run() { + try { + var fateInstanceType = FateInstanceType.fromTableId((extent.tableId())); + + Optional<FateId> optFateId = + manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT", FateKey.forSplit(extent), + new FindSplits(extent), true, "System initiated split of tablet " + extent); + + optFateId.ifPresentOrElse(fateId -> { + log.trace("System initiated a split for : {} {}", extent, fateId); + }, () -> { + log.trace("System attempted to initiate a split but one was in progress : {}", extent); + }); + + } catch (Exception e) { + log.error("Failed to split {}", extent, e); + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java deleted file mode 100644 index 999de0f7e9..0000000000 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.split; - -import java.time.Duration; -import java.util.SortedSet; - -import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.split.PreSplit; -import org.apache.accumulo.server.ServerContext; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SplitTask implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(SplitTask.class); - private final Manager manager; - - private final ServerContext context; - private TabletMetadata tablet; - private final long creationTime; - - public SplitTask(ServerContext context, TabletMetadata tablet, Manager manager) { - this.context = context; - this.tablet = tablet; - this.manager = manager; - this.creationTime = System.nanoTime(); - } - - @Override - public void run() { - try { - if (Duration.ofNanos(System.nanoTime() - creationTime).compareTo(Duration.ofMinutes(2)) > 0) { - // the tablet was in the thread pool queue for a bit, lets reread its metadata - tablet = manager.getContext().getAmple().readTablet(tablet.getExtent()); - if (tablet == null) { - // the tablet no longer exists - return; - } - } - - if (tablet.getOperationId() != null) { - // This will be checked in the FATE op, but no need to inspect files and start a FATE op if - // it currently has an operation running against it. - log.debug("Not splitting {} because it has operation id {}", tablet.getExtent(), - tablet.getOperationId()); - manager.getSplitter().removeSplitStarting(tablet.getExtent()); - return; - } - - var extent = tablet.getExtent(); - - SortedSet<Text> splits = SplitUtils.findSplits(context, tablet); - - if (tablet.getEndRow() != null) { - splits.remove(tablet.getEndRow()); - } - - if (splits.size() == 0) { - log.info("Tablet {} needs to split, but no split points could be found.", - tablet.getExtent()); - - manager.getSplitter().rememberUnsplittable(tablet); - manager.getSplitter().removeSplitStarting(tablet.getExtent()); - return; - } - - var fateInstanceType = FateInstanceType.fromTableId((tablet.getTableId())); - FateId fateId = manager.fate(fateInstanceType).startTransaction(); - - manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT", fateId, - new PreSplit(extent, splits), true, - "System initiated split of tablet " + extent + " into " + splits.size() + " splits"); - } catch (Exception e) { - log.error("Failed to split {}", tablet.getExtent(), e); - } - } -} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java index aeeebea780..3acdfe13bf 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java @@ -18,39 +18,29 @@ */ package org.apache.accumulo.manager.split; -import static java.nio.charset.StandardCharsets.UTF_8; - import java.util.Objects; import java.util.Set; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TabletFile; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.util.FileUtil; import org.apache.accumulo.server.util.FileUtil.FileInfo; -import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.Weigher; -import com.google.common.hash.HashCode; -import com.google.common.hash.Hashing; public class Splitter { - private final ExecutorService splitExecutor; - - Cache<KeyExtent,KeyExtent> splitsStarting; - - Cache<KeyExtent,HashCode> unsplittable; + private final ThreadPoolExecutor splitExecutor; private static class CacheKey { @@ -84,52 +74,34 @@ public class Splitter { LoadingCache<CacheKey,FileInfo> splitFileCache; - public static int weigh(KeyExtent keyExtent) { - int size = 0; - size += keyExtent.tableId().toString().length(); - if (keyExtent.endRow() != null) { - size += keyExtent.endRow().getLength(); - } - if (keyExtent.prevEndRow() != null) { - size += keyExtent.prevEndRow().getLength(); - } - return size; - } - public Splitter(ServerContext context) { - this.splitExecutor = context.threadPools().createExecutorService(context.getConfiguration(), - Property.MANAGER_SPLIT_WORKER_THREADS, true); + int numThreads = context.getConfiguration().getCount(Property.MANAGER_SPLIT_WORKER_THREADS); + // Set up thread pool that constrains the amount of task it queues and when full discards task. + // The purpose of this is to avoid reading lots of data into memory if lots of tablets need to + // split. + BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10000); + this.splitExecutor = context.threadPools().createThreadPool(numThreads, numThreads, 0, + TimeUnit.MILLISECONDS, "split_seeder", queue, true); + + // Discard task when the queue is full, this allows the TGW to continue processing task other + // than splits. + this.splitExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); Weigher<CacheKey, FileInfo> weigher = (key, info) -> key.tableId.canonical().length() + key.tabletFile.getPath().toString().length() + info.getFirstRow().getLength() + info.getLastRow().getLength(); - CacheLoader<CacheKey,FileInfo> loader = new CacheLoader<>() { - @Override - public FileInfo load(CacheKey key) throws Exception { - TableConfiguration tableConf = context.getTableConfiguration(key.tableId); - return FileUtil.tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile)) - .get(key.tabletFile); - } + CacheLoader<CacheKey,FileInfo> loader = key -> { + TableConfiguration tableConf = context.getTableConfiguration(key.tableId); + return FileUtil.tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile)) + .get(key.tabletFile); }; splitFileCache = context.getCaches().createNewBuilder(CacheName.SPLITTER_FILES, true) .expireAfterAccess(10, TimeUnit.MINUTES).maximumWeight(10_000_000L).weigher(weigher) .build(loader); - Weigher<KeyExtent,KeyExtent> weigher2 = (keyExtent, keyExtent2) -> weigh(keyExtent); - - // Tracks splits starting, but not forever in case something in the code does not remove it. - splitsStarting = context.getCaches().createNewBuilder(CacheName.SPLITTER_STARTING, true) - .expireAfterAccess(3, TimeUnit.HOURS).maximumWeight(10_000_000L).weigher(weigher2).build(); - - Weigher<KeyExtent,HashCode> weigher3 = (keyExtent, hc) -> { - return weigh(keyExtent) + hc.bits() / 8; - }; - - unsplittable = context.getCaches().createNewBuilder(CacheName.SPLITTER_UNSPLITTABLE, true) - .expireAfterAccess(24, TimeUnit.HOURS).maximumWeight(10_000_000L).weigher(weigher3).build(); } public synchronized void start() {} @@ -142,59 +114,7 @@ public class Splitter { return splitFileCache.get(new CacheKey(tableId, tabletFile)); } - private HashCode caclulateFilesHash(TabletMetadata tabletMetadata) { - var hasher = Hashing.goodFastHash(128).newHasher(); - tabletMetadata.getFiles().stream().map(StoredTabletFile::getNormalizedPathStr).sorted() - .forEach(path -> hasher.putString(path, UTF_8)); - return hasher.hash(); - } - - /** - * This tablet met the criteria for split but inspection could not find a split point. Remember - * this to avoid wasting time on future inspections until its files change. - */ - public void rememberUnsplittable(TabletMetadata tablet) { - unsplittable.put(tablet.getExtent(), caclulateFilesHash(tablet)); - } - - /** - * If tablet has not been marked as unsplittable, or file set has changed since being marked - * splittable, then return true. Else false. - */ - public boolean isSplittable(TabletMetadata tablet) { - if (splitsStarting.getIfPresent(tablet.getExtent()) != null) { - return false; - } - - var hashCode = unsplittable.getIfPresent(tablet.getExtent()); - - if (hashCode != null) { - if (hashCode.equals(caclulateFilesHash(tablet))) { - return false; - } else { - // We know that the list of files for this tablet have changed - // so we can remove it from the set of unsplittable tablets. - unsplittable.invalidate(tablet.getExtent()); - } - } - - return true; - } - - /** - * Temporarily remember that the process of splitting is starting for this tablet making - * {@link #isSplittable(TabletMetadata)} return false in the future. - */ - public boolean addSplitStarting(KeyExtent extent) { - Objects.requireNonNull(extent); - return splitsStarting.asMap().put(extent, extent) == null; - } - - public void removeSplitStarting(KeyExtent extent) { - splitsStarting.invalidate(extent); - } - - public void executeSplit(SplitTask splitTask) { - splitExecutor.execute(splitTask); + public void initiateSplit(SeedSplitTask seedSplitTask) { + splitExecutor.execute(seedSplitTask); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java new file mode 100644 index 0000000000..695f5650cc --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.split; + +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.split.SplitUtils; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FindSplits extends ManagerRepo { + + private static final long serialVersionUID = 1L; + + private static final Logger log = LoggerFactory.getLogger(PreSplit.class); + private final SplitInfo splitInfo; + + public FindSplits(KeyExtent extent) { + this.splitInfo = new SplitInfo(extent, new TreeSet<>()); + } + + @Override + public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { + var extent = splitInfo.getOriginal(); + var tabletMetadata = manager.getContext().getAmple().readTablet(extent); + + if (tabletMetadata == null) { + log.trace("Table {} no longer exist, so not gonna try to find a split point for it", extent); + return null; + } + + if (tabletMetadata.getOperationId() != null) { + log.debug("Not splitting {} because it has operation id {}", tabletMetadata.getExtent(), + tabletMetadata.getOperationId()); + return null; + } + + if (!tabletMetadata.getLogs().isEmpty()) { + // This code is only called by system initiated splits, so if walogs are present it probably + // makes sense to wait for the data in them to be written to a file before finding splits + // points. + log.debug("Not splitting {} because it has walogs {}", tabletMetadata.getExtent(), + tabletMetadata.getLogs().size()); + } + + SortedSet<Text> splits = SplitUtils.findSplits(manager.getContext(), tabletMetadata); + + if (extent.endRow() != null) { + splits.remove(extent.endRow()); + } + + if (splits.isEmpty()) { + log.info("Tablet {} needs to split, but no split points could be found.", + tabletMetadata.getExtent()); + // ELASTICITY_TODO record the fact that tablet is un-splittable in metadata table in a new + // column. Record the config used to reach this decision and a hash of the file. The tablet + // mgmt iterator can inspect this column and only try to split the tablet when something has + // changed. + return null; + } + + return new PreSplit(extent, splits); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index f94afbdb1e..d956821d18 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -129,8 +129,6 @@ public class PreSplit extends ManagerRepo { @Override public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { - manager.getSplitter().removeSplitStarting(splitInfo.getOriginal()); - TabletMetadata tabletMetadata = manager.getContext().getAmple() .readTablet(splitInfo.getOriginal(), PREV_ROW, LOCATION, OPID, LOGS); @@ -171,8 +169,5 @@ public class PreSplit extends ManagerRepo { } @Override - public void undo(FateId fateId, Manager manager) throws Exception { - // TODO is this called if isReady fails? - manager.getSplitter().removeSplitStarting(splitInfo.getOriginal()); - } + public void undo(FateId fateId, Manager manager) throws Exception {} } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitterTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitterTest.java deleted file mode 100644 index 75c0f16bea..0000000000 --- a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitterTest.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.split; - -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.HashSet; -import java.util.Set; - -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.metadata.ReferencedTabletFile; -import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.util.cache.Caches; -import org.apache.accumulo.core.util.threads.ThreadPools; -import org.apache.accumulo.server.ServerContext; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.Test; - -public class SplitterTest { - - @Test - public void testIsSplittable() { - ThreadPools threadPools = createNiceMock(ThreadPools.class); - replay(threadPools); - ServerContext context = createNiceMock(ServerContext.class); - expect(context.threadPools()).andReturn(threadPools).anyTimes(); - expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); - replay(context); - - var splitter = new Splitter(context); - - KeyExtent ke1 = new KeyExtent(TableId.of("1"), new Text("m"), null); - KeyExtent ke2 = new KeyExtent(TableId.of("1"), null, new Text("m")); - - Set<StoredTabletFile> files1 = new HashSet<>(); - files1.add(new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")).insert()); - files1.add(new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")).insert()); - - TabletMetadata tabletMeta1 = createMock(TabletMetadata.class); - expect(tabletMeta1.getExtent()).andReturn(ke1).anyTimes(); - expect(tabletMeta1.getFiles()).andReturn(files1).times(3); - replay(tabletMeta1); - - TabletMetadata tabletMeta2 = createMock(TabletMetadata.class); - expect(tabletMeta2.getExtent()).andReturn(ke2).anyTimes(); - replay(tabletMeta2); - - assertTrue(splitter.isSplittable(tabletMeta1)); - assertTrue(splitter.isSplittable(tabletMeta2)); - - splitter.addSplitStarting(ke1); - - assertFalse(splitter.isSplittable(tabletMeta1)); - assertTrue(splitter.isSplittable(tabletMeta2)); - - splitter.removeSplitStarting(ke1); - - assertTrue(splitter.isSplittable(tabletMeta1)); - assertTrue(splitter.isSplittable(tabletMeta2)); - - splitter.rememberUnsplittable(tabletMeta1); - - assertFalse(splitter.isSplittable(tabletMeta1)); - assertTrue(splitter.isSplittable(tabletMeta2)); - - // tabletMeta1 is currently unsplittable. Adding a file - // to it's file set should cause it to be removed from - // the unsplittable set of tablets, becoming splittable - // again. - files1.add(new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf")).insert()); - assertTrue(splitter.isSplittable(tabletMeta1)); - - // when a tablets files change it should become a candidate for inspection - Set<StoredTabletFile> files2 = Set.of( - new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")) - .insert(), - new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")) - .insert(), - new ReferencedTabletFile( - new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000073.rf")) - .insert()); - TabletMetadata tabletMeta3 = createMock(TabletMetadata.class); - expect(tabletMeta3.getExtent()).andReturn(ke1).anyTimes(); - expect(tabletMeta3.getFiles()).andReturn(files2).anyTimes(); - replay(tabletMeta3); - - assertTrue(splitter.isSplittable(tabletMeta3)); - assertTrue(splitter.isSplittable(tabletMeta2)); - - verify(threadPools, context, tabletMeta1, tabletMeta2, tabletMeta3); - } - -}