This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit f337357a527b1d8beab4ad6573fdde3229bd101d Merge: 9fd7338ba4 859694aba8 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Apr 11 20:07:16 2024 -0400 Merge branch 'main' into elasticity assemble/bin/accumulo-cluster | 2 +- .../java/org/apache/accumulo/core/conf/Property.java | 16 ++++++++-------- .../server/manager/state/TabletManagementIterator.java | 4 ++-- .../org/apache/accumulo/server/split/SplitUtils.java | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --cc assemble/bin/accumulo-cluster index 82e53dbd4a,f525093404..b157f04c1e --- a/assemble/bin/accumulo-cluster +++ b/assemble/bin/accumulo-cluster @@@ -156,9 -131,9 +156,9 @@@ function control_service() for ((inst_id = 1; inst_id <= last_instance_id; inst_id++)); do ACCUMULO_SERVICE_INSTANCE="" - [[ $service == "tserver" && ${NUM_TSERVERS:-1} -gt 1 ]] && ACCUMULO_SERVICE_INSTANCE=${inst_id} - [[ $service == "compactor" ]] && ACCUMULO_SERVICE_INSTANCE="${inst_id}_${5}" - [[ $service == "sserver" ]] && ACCUMULO_SERVICE_INSTANCE="${inst_id}_${5}" + [[ $service == "tserver" && ${NUM_TSERVERS:-1} -gt 1 ]] && ACCUMULO_SERVICE_INSTANCE="_${group}_${inst_id}" + [[ $service == "compactor" ]] && ACCUMULO_SERVICE_INSTANCE="_${group}_${inst_id}" - [[ $service == "sserver" && ${NUM_SSERVERS:-1} -gt 1 ]] && ACCUMULO_SERVICE_INSTANCE="_${group}_${inst_id}" ++ [[ $service == "sserver" ]] && ACCUMULO_SERVICE_INSTANCE="_${group}_${inst_id}" if [[ $host == localhost || $host == "$(hostname -s)" || $host == "$(hostname -f)" || "$(hostname -I)" =~ $host ]]; then # diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index d5a9504f28,f86e4cad3b..e33eed2748 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -433,13 -395,15 +433,21 @@@ public enum Property + "indefinitely. Default is 0 to block indefinitely. Only valid when tserver available " + "threshold is set greater than 0.", "1.10.0"), + MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size", "8", PropertyType.COUNT, + "The number of threads used to inspect tablets files to find split points.", "4.0.0"), + + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size", + // ELASTICITY_TODO: It might be good to note that there is a priority queue per compactor + // resource group + "10000", PropertyType.COUNT, "The max size of the priority queue.", "4.0"), + SPLIT_PREFIX("split.", null, PropertyType.PREFIX, + "System wide properties related to splitting tablets.", "3.1.0"), + SPLIT_MAXOPEN("split.files.max", "300", PropertyType.COUNT, + "To find a tablets split points, all RFiles are opened and their indexes" + + " are read. This setting determines how many RFiles can be opened at once." - + " When there are more RFiles than this setting multiple passes must be" - + " made, which is slower. However opening too many RFiles at once can cause" - + " problems.", ++ + " When there are more RFiles than this setting the tablet will be marked" ++ + " as un-splittable.", + "3.1.0"), // properties that are specific to scan server behavior @Experimental SSERV_PREFIX("sserver.", null, PropertyType.PREFIX, @@@ -545,14 -505,16 +553,6 @@@ TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "5%", PropertyType.MEMORY, "The amount of memory used to store write-ahead-log mutations before flushing them.", "1.7.0"), - @ReplacedBy(property = SPLIT_MAXOPEN) - @Deprecated(since = "3.1") -- TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN("tserver.tablet.split.midpoint.files.max", "300", -- PropertyType.COUNT, -- "To find a tablets split points, all RFiles are opened and their indexes" -- + " are read. This setting determines how many RFiles can be opened at once." -- + " When there are more RFiles than this setting multiple passes must be" -- + " made, which is slower. However opening too many RFiles at once can cause" -- + " problems.", -- "1.3.5"), TSERV_WAL_MAX_REFERENCED("tserver.wal.max.referenced", "3", PropertyType.COUNT, "When a tablet server has more than this many write ahead logs, any tablet referencing older " + "logs over this threshold is minor compacted. Also any tablet referencing this many " diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 39329b0e42,0000000000..b3ebe61c1d mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@@ -1,298 -1,0 +1,298 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager.state; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.PluginEnvironment.Configuration; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SkippingIterator; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.manager.state.TabletManagement; +import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; +import org.apache.accumulo.core.metadata.TabletState; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; +import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer; +import org.apache.accumulo.core.spi.balancer.TabletBalancer; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.server.compaction.CompactionJobGenerator; +import org.apache.accumulo.server.fs.VolumeUtil; +import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; +import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; +import org.apache.accumulo.server.split.SplitUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Iterator used by the TabletGroupWatcher threads in the Manager. This iterator returns + * TabletManagement objects for each Tablet that needs some type of action performed on it by the + * Manager. + */ +public class TabletManagementIterator extends SkippingIterator { + private static final Logger LOG = LoggerFactory.getLogger(TabletManagementIterator.class); + public static final String TABLET_GOAL_STATE_PARAMS_OPTION = "tgsParams"; + private CompactionJobGenerator compactionGenerator; + private TabletBalancer balancer; + + private static boolean shouldReturnDueToSplit(final TabletMetadata tm, + final Configuration tableConfig) { + + final long splitThreshold = ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_SPLIT_THRESHOLD.getKey())); + final long maxEndRowSize = ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_MAX_END_ROW_SIZE.getKey())); - final int maxFilesToOpen = (int) ConfigurationTypeHelper.getFixedMemoryAsBytes( - tableConfig.get(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN.getKey())); ++ final int maxFilesToOpen = (int) ConfigurationTypeHelper ++ .getFixedMemoryAsBytes(tableConfig.get(Property.SPLIT_MAXOPEN.getKey())); + + // If the current computed metadata matches the current marker then we can't split, + // so we return false. If the marker is set but doesn't match then return true + // which gives a chance to clean up the marker and recheck. + var unsplittable = tm.getUnSplittable(); + if (unsplittable != null) { + return !unsplittable.equals(UnSplittableMetadata.toUnSplittable(tm.getExtent(), + splitThreshold, maxEndRowSize, maxFilesToOpen, tm.getFiles())); + } + + // If unsplittable is not set at all then check if over split threshold + final boolean shouldSplit = SplitUtils.needsSplit(tableConfig, tm); + LOG.trace("{} should split? sum: {}, threshold: {}, result: {}", tm.getExtent(), + tm.getFileSize(), splitThreshold, shouldSplit); + return shouldSplit; + } + + private boolean shouldReturnDueToLocation(final TabletMetadata tm) { + + if (tm.getExtent().isRootTablet()) { + return true; + } + + if (tabletMgmtParams.getMigrations().containsKey(tm.getExtent())) { + // Ideally only the state and goalState would need to be used to determine if a tablet should + // be returned. However, the Manager/TGW currently needs everything in the migrating set + // returned so it can update in memory maps it has. If this were improved then this case would + // not be needed. + return true; + } + + TabletState state = TabletState.compute(tm, tabletMgmtParams.getOnlineTsevers()); + TabletGoalState goalState = TabletGoalState.compute(tm, state, balancer, tabletMgmtParams); + if (LOG.isTraceEnabled()) { + LOG.trace( + "extent:{} state:{} goalState:{} tabletAvailability:{}, hostingRequested: {}, opId: {}", + tm.getExtent(), state, goalState, tm.getTabletAvailability(), tm.getHostingRequested(), + tm.getOperationId()); + } + + switch (goalState) { + case HOSTED: + return state != TabletState.HOSTED; + case SUSPENDED: + return state != TabletState.SUSPENDED; + case UNASSIGNED: + return state != TabletState.UNASSIGNED; + default: + throw new IllegalStateException("unknown goal state " + goalState); + } + } + + public static void configureScanner(final ScannerBase scanner, + final TabletManagementParameters tabletMgmtParams) { + // Note : if the scanner is ever made to fetch columns, then TabletManagement.CONFIGURED_COLUMNS + // must be updated + scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class)); + IteratorSetting tabletChange = + new IteratorSetting(1001, "ManagerTabletInfoIterator", TabletManagementIterator.class); + tabletChange.addOption(TABLET_GOAL_STATE_PARAMS_OPTION, tabletMgmtParams.serialize()); + scanner.addScanIterator(tabletChange); + } + + public static TabletManagement decode(Entry<Key,Value> e) throws IOException { + return new TabletManagement(e.getKey(), e.getValue()); + } + + private IteratorEnvironment env; + private Key topKey = null; + private Value topValue = null; + private TabletManagementParameters tabletMgmtParams = null; + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + IteratorEnvironment env) throws IOException { + super.init(source, options, env); + this.env = env; + tabletMgmtParams = + TabletManagementParameters.deserialize(options.get(TABLET_GOAL_STATE_PARAMS_OPTION)); + compactionGenerator = + new CompactionJobGenerator(env.getPluginEnv(), tabletMgmtParams.getCompactionHints()); + final AccumuloConfiguration conf = new ConfigurationCopy(env.getPluginEnv().getConfiguration()); + BalancerEnvironmentImpl benv = + new BalancerEnvironmentImpl(((TabletIteratorEnvironment) env).getServerContext()); + balancer = Property.createInstanceFromPropertyName(conf, Property.MANAGER_TABLET_BALANCER, + TabletBalancer.class, new SimpleLoadBalancer()); + balancer.init(benv); + } + + @Override + public Key getTopKey() { + return topKey; + } + + @Override + public Value getTopValue() { + return topValue; + } + + @Override + public boolean hasTop() { + return topKey != null && topValue != null; + } + + @Override + protected void consume() throws IOException { + topKey = null; + topValue = null; + + final Set<ManagementAction> actions = new HashSet<>(); + while (getSource().hasTop()) { + final Key k = getSource().getTopKey(); + final Value v = getSource().getTopValue(); + final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v); + final TabletMetadata tm = TabletMetadata.convertRow(decodedRow.entrySet().iterator(), + TabletManagement.CONFIGURED_COLUMNS, false, true); + + actions.clear(); + Exception error = null; + try { + LOG.trace("Evaluating extent: {}", tm); + computeTabletManagementActions(tm, actions); + } catch (Exception e) { + LOG.error("Error computing tablet management actions for extent: {}", tm.getExtent(), e); + error = e; + } + + if (!actions.isEmpty() || error != null) { + if (error != null) { + // Insert the error into K,V pair representing + // the tablet metadata. + TabletManagement.addError(decodedRow, error); + } else if (!actions.isEmpty()) { + // If we simply returned here, then the client would get the encoded K,V + // from the WholeRowIterator. However, it would not know the reason(s) why + // it was returned. Insert a K,V pair to represent the reasons. The client + // can pull this K,V pair from the results by looking at the colf. + TabletManagement.addActions(decodedRow, actions); + } + topKey = decodedRow.firstKey(); + topValue = WholeRowIterator.encodeRow(new ArrayList<>(decodedRow.keySet()), + new ArrayList<>(decodedRow.values())); + LOG.trace("Returning extent {} with reasons: {}", tm.getExtent(), actions); + return; + } + + LOG.trace("No reason to return extent {}, continuing", tm.getExtent()); + getSource().next(); + } + } + + private static final Set<ManagementAction> REASONS_NOT_TO_SPLIT_OR_COMPACT = + Collections.unmodifiableSet(EnumSet.of(ManagementAction.BAD_STATE, + ManagementAction.NEEDS_VOLUME_REPLACEMENT, ManagementAction.NEEDS_RECOVERY)); + + /** + * Evaluates whether or not this Tablet should be returned so that it can be acted upon by the + * Manager + */ + private void computeTabletManagementActions(final TabletMetadata tm, + final Set<ManagementAction> reasonsToReturnThisTablet) { + + if (tm.isFutureAndCurrentLocationSet()) { + // no need to check everything, we are in a known state where we want to return everything. + reasonsToReturnThisTablet.add(ManagementAction.BAD_STATE); + } + + if (!tm.getLogs().isEmpty() && (tm.getOperationId() == null + || tm.getOperationId().getType() != TabletOperationType.DELETING)) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_RECOVERY); + } + + if (VolumeUtil.needsVolumeReplacement(tabletMgmtParams.getVolumeReplacements(), tm)) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_VOLUME_REPLACEMENT); + } + + if (shouldReturnDueToLocation(tm)) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE); + } + + if (tm.getOperationId() == null && tabletMgmtParams.isTableOnline(tm.getTableId()) + && Collections.disjoint(REASONS_NOT_TO_SPLIT_OR_COMPACT, reasonsToReturnThisTablet)) { + try { + if (shouldReturnDueToSplit(tm, this.env.getPluginEnv().getConfiguration(tm.getTableId()))) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_SPLITTING); + } + // important to call this since reasonsToReturnThisTablet is passed to it + if (!compactionGenerator + .generateJobs(tm, determineCompactionKinds(reasonsToReturnThisTablet)).isEmpty()) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_COMPACTING); + } + } catch (NullPointerException e) { + LOG.info( + "Unable to determine if tablet {} should split or compact, maybe table was deleted?", + tm.getExtent()); + } + } + } + + private static final Set<CompactionKind> ALL_COMPACTION_KINDS = + Collections.unmodifiableSet(EnumSet.allOf(CompactionKind.class)); + private static final Set<CompactionKind> SPLIT_COMPACTION_KINDS; + + static { + var tmp = EnumSet.allOf(CompactionKind.class); + tmp.remove(CompactionKind.SYSTEM); + SPLIT_COMPACTION_KINDS = Collections.unmodifiableSet(tmp); + } + + public static Set<CompactionKind> + determineCompactionKinds(Set<ManagementAction> reasonsToReturnThisTablet) { + if (reasonsToReturnThisTablet.contains(ManagementAction.NEEDS_SPLITTING)) { + return SPLIT_COMPACTION_KINDS; + } else { + return ALL_COMPACTION_KINDS; + } + } +} diff --cc server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java index fcf4e2422f,0000000000..eb5ceec1bf mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java @@@ -1,334 -1,0 +1,334 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.split; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.function.Predicate; + +import org.apache.accumulo.core.client.PluginEnvironment.Configuration; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +public class SplitUtils { + + private static final Logger log = LoggerFactory.getLogger(SplitUtils.class); + + static class IndexIterator implements Iterator<Key> { + + private final SortedKeyValueIterator<Key,Value> source; + + private final Text prevEndRow; + private final Text endRow; + + public IndexIterator(SortedKeyValueIterator<Key,Value> source, Text endRow, Text prevEndRow) { + this.source = source; + this.prevEndRow = prevEndRow; + this.endRow = endRow; + } + + @Override + public boolean hasNext() { + if (prevEndRow != null) { + // this code filters out data because the rfile index iterators do not support seek, so just + // discard everything before our point of interest. + while (source.hasTop() && source.getTopKey().getRow().compareTo(prevEndRow) <= 0) { + try { + source.next(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + if (endRow != null) { + return source.hasTop() && source.getTopKey().getRow().compareTo(endRow) <= 0; + } + + return source.hasTop(); + } + + @Override + public Key next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + Key key = source.getTopKey(); + + try { + source.next(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return key; + } + } + + private static ArrayList<FileSKVIterator> openIndexes(ServerContext context, + TableConfiguration tableConf, Collection<StoredTabletFile> files) throws IOException { + + ArrayList<FileSKVIterator> readers = new ArrayList<>(); + + try { + for (TabletFile file : files) { + FileSKVIterator reader = null; + FileSystem ns = context.getVolumeManager().getFileSystemByPath(file.getPath()); + + reader = FileOperations.getInstance().newIndexReaderBuilder() + .forFile(file, ns, ns.getConf(), tableConf.getCryptoService()) + .withTableConfiguration(tableConf).build(); + + readers.add(reader); + } + } catch (IOException ioe) { + readers.forEach(reader -> { + try { + reader.close(); + } catch (IOException e) { + log.debug("failed to close reader", e); + } + }); + throw ioe; + } + + return readers; + } + + public static class IndexIterable implements AutoCloseable, Iterable<Key> { + + private final ServerContext context; + private final TableConfiguration tableConf; + private final Collection<StoredTabletFile> files; + private final Text prevEndRow; + private final Text endRow; + private ArrayList<FileSKVIterator> readers; + + public IndexIterable(ServerContext context, TableConfiguration tableConf, + Collection<StoredTabletFile> files, Text endRow, Text prevEndRow) { + this.context = context; + this.tableConf = tableConf; + this.files = files; + this.prevEndRow = prevEndRow; + this.endRow = endRow; + } + + @Override + public Iterator<Key> iterator() { + close(); + try { + readers = openIndexes(context, tableConf, files); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<>(readers); + MultiIterator mmfi = new MultiIterator(iters, true); + + return new IndexIterator(mmfi, endRow, prevEndRow); + } + + @Override + public void close() { + if (readers != null) { + readers.forEach(reader -> { + try { + reader.close(); + } catch (IOException e) { + log.debug("Failed to close index reader", e); + } + }); + readers = null; + } + } + } + + public static int calculateDesiredSplits(long esitimatedSize, long splitThreshold) { + // ELASTICITY_TODO tablets used to always split into 2 tablets. Now the split operation will + // split into many. How does this impact a tablet with many files and the estimated sizes after + // split vs the old method. Need to run test where we add lots of data to a single tablet, + // change the split thresh, wait for splits, then look at the estimated sizes, then compact and + // look at the sizes after. For example if a tablet has 10M of data and the split thesh is set + // to 100K, what will the est sizes look like across the tablets after splitting and then after + // compacting? + return (int) Math.floor((double) esitimatedSize / (double) splitThreshold); + } + + public static SortedSet<Text> findSplits(ServerContext context, TabletMetadata tabletMetadata) { + var tableConf = context.getTableConfiguration(tabletMetadata.getTableId()); + var threshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); + var maxEndRowSize = tableConf.getAsBytes(Property.TABLE_MAX_END_ROW_SIZE); + + // ELASTICITY_TODO rename and deprecate property. This is not executing in the tablet server + // anymore. - int maxFilesToOpen = tableConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN); ++ int maxFilesToOpen = tableConf.getCount(Property.SPLIT_MAXOPEN); + + var estimatedSize = tabletMetadata.getFileSize(); + if (!needsSplit(context, tabletMetadata)) { + return new TreeSet<>(); + } + + if (tabletMetadata.getFiles().size() >= maxFilesToOpen) { + log.warn("Tablet {} has {} files which exceeds the max to open for split, so can not split.", + tabletMetadata.getExtent(), tabletMetadata.getFiles().size()); + return new TreeSet<>(); + } + + try (var indexIterable = new IndexIterable(context, tableConf, tabletMetadata.getFiles(), + tabletMetadata.getEndRow(), tabletMetadata.getPrevEndRow())) { + + Predicate<ByteSequence> splitPredicate = splitCandidate -> { + if (splitCandidate.length() >= maxEndRowSize) { + log.warn("Ignoring split point for {} of length {}", tabletMetadata.getExtent(), + splitCandidate.length()); + return false; + } + + return true; + }; + + return findSplits(indexIterable, calculateDesiredSplits(estimatedSize, threshold), + splitPredicate); + } + } + + private static int longestCommonLength(ByteSequence bs1, ByteSequence bs2) { + int common = 0; + while (common < bs1.length() && common < bs2.length() + && bs1.byteAt(common) == bs2.byteAt(common)) { + common++; + } + return common; + } + + public static SortedSet<Text> findSplits(Iterable<Key> tabletIndexIterator, int desiredSplits, + Predicate<ByteSequence> rowPredicate) { + Preconditions.checkArgument(desiredSplits >= 1); + + int numKeys = Iterables.size(tabletIndexIterator); + + double interSplitDistance = (double) numKeys / (double) (desiredSplits + 1); + + SortedSet<Text> splits = new TreeSet<>(); + + long count = 0; + + ByteSequence prevRow = null; + ByteSequence lastRow = null; + + for (Key key : tabletIndexIterator) { + if (lastRow != null && !key.getRowData().equals(lastRow)) { + prevRow = lastRow; + } + + count++; + + if (count >= Math.round((splits.size() + 1) * interSplitDistance)) { + if (prevRow == null) { + if (rowPredicate.test(key.getRowData())) { + splits.add(key.getRow()); + } + } else { + var lcl = longestCommonLength(prevRow, key.getRowData()); + if (lcl + 1 >= key.getRowData().length()) { + if (rowPredicate.test(key.getRowData())) { + splits.add(key.getRow()); + } + } else { + var shortenedRow = key.getRowData().subSequence(0, lcl + 1); + if (rowPredicate.test(shortenedRow)) { + splits.add(new Text(shortenedRow.toArray())); + } + } + } + + if (splits.size() >= desiredSplits) { + break; + } + } + + lastRow = key.getRowData(); + } + + return splits; + } + + public static boolean needsSplit(ServerContext context, TabletMetadata tabletMetadata) { + var tableConf = context.getTableConfiguration(tabletMetadata.getTableId()); + var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); + return needsSplit(splitThreshold, tabletMetadata); + } + + public static boolean needsSplit(final Configuration tableConf, TabletMetadata tabletMetadata) { + var splitThreshold = ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConf.get(Property.TABLE_SPLIT_THRESHOLD.getKey())); + return needsSplit(splitThreshold, tabletMetadata); + } + + public static boolean needsSplit(long splitThreshold, TabletMetadata tabletMetadata) { + return tabletMetadata.getFileSize() > splitThreshold; + } + + public static UnSplittableMetadata toUnSplittable(ServerContext context, + TabletMetadata tabletMetadata) { + var tableConf = context.getTableConfiguration(tabletMetadata.getTableId()); + var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); + var maxEndRowSize = tableConf.getAsBytes(Property.TABLE_MAX_END_ROW_SIZE); - int maxFilesToOpen = tableConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN); ++ int maxFilesToOpen = tableConf.getCount(Property.SPLIT_MAXOPEN); + + var unSplittableMetadata = UnSplittableMetadata.toUnSplittable(tabletMetadata.getExtent(), + splitThreshold, maxEndRowSize, maxFilesToOpen, tabletMetadata.getFiles()); + + log.trace( + "Created unsplittable metadata for tablet {}. splitThreshold: {}, maxEndRowSize:{}, maxFilesToOpen: {}, hashCode: {}", + tabletMetadata.getExtent(), splitThreshold, maxEndRowSize, maxFilesToOpen, + unSplittableMetadata); + + return unSplittableMetadata; + } + +}