This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit f337357a527b1d8beab4ad6573fdde3229bd101d
Merge: 9fd7338ba4 859694aba8
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Apr 11 20:07:16 2024 -0400

    Merge branch 'main' into elasticity

 assemble/bin/accumulo-cluster                            |  2 +-
 .../java/org/apache/accumulo/core/conf/Property.java     | 16 ++++++++--------
 .../server/manager/state/TabletManagementIterator.java   |  4 ++--
 .../org/apache/accumulo/server/split/SplitUtils.java     |  4 ++--
 4 files changed, 13 insertions(+), 13 deletions(-)

diff --cc assemble/bin/accumulo-cluster
index 82e53dbd4a,f525093404..b157f04c1e
--- a/assemble/bin/accumulo-cluster
+++ b/assemble/bin/accumulo-cluster
@@@ -156,9 -131,9 +156,9 @@@ function control_service() 
  
    for ((inst_id = 1; inst_id <= last_instance_id; inst_id++)); do
      ACCUMULO_SERVICE_INSTANCE=""
 -    [[ $service == "tserver" && ${NUM_TSERVERS:-1} -gt 1 ]] && 
ACCUMULO_SERVICE_INSTANCE=${inst_id}
 -    [[ $service == "compactor" ]] && 
ACCUMULO_SERVICE_INSTANCE="${inst_id}_${5}"
 -    [[ $service == "sserver" ]] && ACCUMULO_SERVICE_INSTANCE="${inst_id}_${5}"
 +    [[ $service == "tserver" && ${NUM_TSERVERS:-1} -gt 1 ]] && 
ACCUMULO_SERVICE_INSTANCE="_${group}_${inst_id}"
 +    [[ $service == "compactor" ]] && 
ACCUMULO_SERVICE_INSTANCE="_${group}_${inst_id}"
-     [[ $service == "sserver" && ${NUM_SSERVERS:-1} -gt 1 ]] && 
ACCUMULO_SERVICE_INSTANCE="_${group}_${inst_id}"
++    [[ $service == "sserver" ]] && 
ACCUMULO_SERVICE_INSTANCE="_${group}_${inst_id}"
  
      if [[ $host == localhost || $host == "$(hostname -s)" || $host == 
"$(hostname -f)" || "$(hostname -I)" =~ $host ]]; then
        #
diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index d5a9504f28,f86e4cad3b..e33eed2748
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -433,13 -395,15 +433,21 @@@ public enum Property 
            + "indefinitely. Default is 0 to block indefinitely. Only valid 
when tserver available "
            + "threshold is set greater than 0.",
        "1.10.0"),
 +  MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size", 
"8", PropertyType.COUNT,
 +      "The number of threads used to inspect tablets files to find split 
points.", "4.0.0"),
 +
 +  
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
 +      // ELASTICITY_TODO: It might be good to note that there is a priority 
queue per compactor
 +      // resource group
 +      "10000", PropertyType.COUNT, "The max size of the priority queue.", 
"4.0"),
+   SPLIT_PREFIX("split.", null, PropertyType.PREFIX,
+       "System wide properties related to splitting tablets.", "3.1.0"),
+   SPLIT_MAXOPEN("split.files.max", "300", PropertyType.COUNT,
+       "To find a tablets split points, all RFiles are opened and their 
indexes"
+           + " are read. This setting determines how many RFiles can be opened 
at once."
 -          + " When there are more RFiles than this setting multiple passes 
must be"
 -          + " made, which is slower. However opening too many RFiles at once 
can cause"
 -          + " problems.",
++          + " When there are more RFiles than this setting the tablet will be 
marked"
++          + " as un-splittable.",
+       "3.1.0"),
    // properties that are specific to scan server behavior
    @Experimental
    SSERV_PREFIX("sserver.", null, PropertyType.PREFIX,
@@@ -545,14 -505,16 +553,6 @@@
    TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "5%", 
PropertyType.MEMORY,
        "The amount of memory used to store write-ahead-log mutations before 
flushing them.",
        "1.7.0"),
 -  @ReplacedBy(property = SPLIT_MAXOPEN)
 -  @Deprecated(since = "3.1")
--  
TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN("tserver.tablet.split.midpoint.files.max",
 "300",
--      PropertyType.COUNT,
--      "To find a tablets split points, all RFiles are opened and their 
indexes"
--          + " are read. This setting determines how many RFiles can be opened 
at once."
--          + " When there are more RFiles than this setting multiple passes 
must be"
--          + " made, which is slower. However opening too many RFiles at once 
can cause"
--          + " problems.",
--      "1.3.5"),
    TSERV_WAL_MAX_REFERENCED("tserver.wal.max.referenced", "3", 
PropertyType.COUNT,
        "When a tablet server has more than this many write ahead logs, any 
tablet referencing older "
            + "logs over this threshold is minor compacted.  Also any tablet 
referencing this many "
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
index 39329b0e42,0000000000..b3ebe61c1d
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
@@@ -1,298 -1,0 +1,298 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.manager.state;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.PluginEnvironment.Configuration;
 +import org.apache.accumulo.core.client.ScannerBase;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.ConfigurationCopy;
 +import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SkippingIterator;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
 +import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
 +import org.apache.accumulo.core.metadata.TabletState;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata;
 +import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer;
 +import org.apache.accumulo.core.spi.balancer.TabletBalancer;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.server.compaction.CompactionJobGenerator;
 +import org.apache.accumulo.server.fs.VolumeUtil;
 +import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
 +import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
 +import org.apache.accumulo.server.split.SplitUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * Iterator used by the TabletGroupWatcher threads in the Manager. This 
iterator returns
 + * TabletManagement objects for each Tablet that needs some type of action 
performed on it by the
 + * Manager.
 + */
 +public class TabletManagementIterator extends SkippingIterator {
 +  private static final Logger LOG = 
LoggerFactory.getLogger(TabletManagementIterator.class);
 +  public static final String TABLET_GOAL_STATE_PARAMS_OPTION = "tgsParams";
 +  private CompactionJobGenerator compactionGenerator;
 +  private TabletBalancer balancer;
 +
 +  private static boolean shouldReturnDueToSplit(final TabletMetadata tm,
 +      final Configuration tableConfig) {
 +
 +    final long splitThreshold = ConfigurationTypeHelper
 +        
.getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_SPLIT_THRESHOLD.getKey()));
 +    final long maxEndRowSize = ConfigurationTypeHelper
 +        
.getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_MAX_END_ROW_SIZE.getKey()));
-     final int maxFilesToOpen = (int) 
ConfigurationTypeHelper.getFixedMemoryAsBytes(
-         
tableConfig.get(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN.getKey()));
++    final int maxFilesToOpen = (int) ConfigurationTypeHelper
++        
.getFixedMemoryAsBytes(tableConfig.get(Property.SPLIT_MAXOPEN.getKey()));
 +
 +    // If the current computed metadata matches the current marker then we 
can't split,
 +    // so we return false. If the marker is set but doesn't match then return 
true
 +    // which gives a chance to clean up the marker and recheck.
 +    var unsplittable = tm.getUnSplittable();
 +    if (unsplittable != null) {
 +      return 
!unsplittable.equals(UnSplittableMetadata.toUnSplittable(tm.getExtent(),
 +          splitThreshold, maxEndRowSize, maxFilesToOpen, tm.getFiles()));
 +    }
 +
 +    // If unsplittable is not set at all then check if over split threshold
 +    final boolean shouldSplit = SplitUtils.needsSplit(tableConfig, tm);
 +    LOG.trace("{} should split? sum: {}, threshold: {}, result: {}", 
tm.getExtent(),
 +        tm.getFileSize(), splitThreshold, shouldSplit);
 +    return shouldSplit;
 +  }
 +
 +  private boolean shouldReturnDueToLocation(final TabletMetadata tm) {
 +
 +    if (tm.getExtent().isRootTablet()) {
 +      return true;
 +    }
 +
 +    if (tabletMgmtParams.getMigrations().containsKey(tm.getExtent())) {
 +      // Ideally only the state and goalState would need to be used to 
determine if a tablet should
 +      // be returned. However, the Manager/TGW currently needs everything in 
the migrating set
 +      // returned so it can update in memory maps it has. If this were 
improved then this case would
 +      // not be needed.
 +      return true;
 +    }
 +
 +    TabletState state = TabletState.compute(tm, 
tabletMgmtParams.getOnlineTsevers());
 +    TabletGoalState goalState = TabletGoalState.compute(tm, state, balancer, 
tabletMgmtParams);
 +    if (LOG.isTraceEnabled()) {
 +      LOG.trace(
 +          "extent:{} state:{} goalState:{} tabletAvailability:{}, 
hostingRequested: {}, opId: {}",
 +          tm.getExtent(), state, goalState, tm.getTabletAvailability(), 
tm.getHostingRequested(),
 +          tm.getOperationId());
 +    }
 +
 +    switch (goalState) {
 +      case HOSTED:
 +        return state != TabletState.HOSTED;
 +      case SUSPENDED:
 +        return state != TabletState.SUSPENDED;
 +      case UNASSIGNED:
 +        return state != TabletState.UNASSIGNED;
 +      default:
 +        throw new IllegalStateException("unknown goal state " + goalState);
 +    }
 +  }
 +
 +  public static void configureScanner(final ScannerBase scanner,
 +      final TabletManagementParameters tabletMgmtParams) {
 +    // Note : if the scanner is ever made to fetch columns, then 
TabletManagement.CONFIGURED_COLUMNS
 +    // must be updated
 +    scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", 
WholeRowIterator.class));
 +    IteratorSetting tabletChange =
 +        new IteratorSetting(1001, "ManagerTabletInfoIterator", 
TabletManagementIterator.class);
 +    tabletChange.addOption(TABLET_GOAL_STATE_PARAMS_OPTION, 
tabletMgmtParams.serialize());
 +    scanner.addScanIterator(tabletChange);
 +  }
 +
 +  public static TabletManagement decode(Entry<Key,Value> e) throws 
IOException {
 +    return new TabletManagement(e.getKey(), e.getValue());
 +  }
 +
 +  private IteratorEnvironment env;
 +  private Key topKey = null;
 +  private Value topValue = null;
 +  private TabletManagementParameters tabletMgmtParams = null;
 +
 +  @Override
 +  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
 +      IteratorEnvironment env) throws IOException {
 +    super.init(source, options, env);
 +    this.env = env;
 +    tabletMgmtParams =
 +        
TabletManagementParameters.deserialize(options.get(TABLET_GOAL_STATE_PARAMS_OPTION));
 +    compactionGenerator =
 +        new CompactionJobGenerator(env.getPluginEnv(), 
tabletMgmtParams.getCompactionHints());
 +    final AccumuloConfiguration conf = new 
ConfigurationCopy(env.getPluginEnv().getConfiguration());
 +    BalancerEnvironmentImpl benv =
 +        new BalancerEnvironmentImpl(((TabletIteratorEnvironment) 
env).getServerContext());
 +    balancer = Property.createInstanceFromPropertyName(conf, 
Property.MANAGER_TABLET_BALANCER,
 +        TabletBalancer.class, new SimpleLoadBalancer());
 +    balancer.init(benv);
 +  }
 +
 +  @Override
 +  public Key getTopKey() {
 +    return topKey;
 +  }
 +
 +  @Override
 +  public Value getTopValue() {
 +    return topValue;
 +  }
 +
 +  @Override
 +  public boolean hasTop() {
 +    return topKey != null && topValue != null;
 +  }
 +
 +  @Override
 +  protected void consume() throws IOException {
 +    topKey = null;
 +    topValue = null;
 +
 +    final Set<ManagementAction> actions = new HashSet<>();
 +    while (getSource().hasTop()) {
 +      final Key k = getSource().getTopKey();
 +      final Value v = getSource().getTopValue();
 +      final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, 
v);
 +      final TabletMetadata tm = 
TabletMetadata.convertRow(decodedRow.entrySet().iterator(),
 +          TabletManagement.CONFIGURED_COLUMNS, false, true);
 +
 +      actions.clear();
 +      Exception error = null;
 +      try {
 +        LOG.trace("Evaluating extent: {}", tm);
 +        computeTabletManagementActions(tm, actions);
 +      } catch (Exception e) {
 +        LOG.error("Error computing tablet management actions for extent: {}", 
tm.getExtent(), e);
 +        error = e;
 +      }
 +
 +      if (!actions.isEmpty() || error != null) {
 +        if (error != null) {
 +          // Insert the error into K,V pair representing
 +          // the tablet metadata.
 +          TabletManagement.addError(decodedRow, error);
 +        } else if (!actions.isEmpty()) {
 +          // If we simply returned here, then the client would get the 
encoded K,V
 +          // from the WholeRowIterator. However, it would not know the 
reason(s) why
 +          // it was returned. Insert a K,V pair to represent the reasons. The 
client
 +          // can pull this K,V pair from the results by looking at the colf.
 +          TabletManagement.addActions(decodedRow, actions);
 +        }
 +        topKey = decodedRow.firstKey();
 +        topValue = WholeRowIterator.encodeRow(new 
ArrayList<>(decodedRow.keySet()),
 +            new ArrayList<>(decodedRow.values()));
 +        LOG.trace("Returning extent {} with reasons: {}", tm.getExtent(), 
actions);
 +        return;
 +      }
 +
 +      LOG.trace("No reason to return extent {}, continuing", tm.getExtent());
 +      getSource().next();
 +    }
 +  }
 +
 +  private static final Set<ManagementAction> REASONS_NOT_TO_SPLIT_OR_COMPACT =
 +      Collections.unmodifiableSet(EnumSet.of(ManagementAction.BAD_STATE,
 +          ManagementAction.NEEDS_VOLUME_REPLACEMENT, 
ManagementAction.NEEDS_RECOVERY));
 +
 +  /**
 +   * Evaluates whether or not this Tablet should be returned so that it can 
be acted upon by the
 +   * Manager
 +   */
 +  private void computeTabletManagementActions(final TabletMetadata tm,
 +      final Set<ManagementAction> reasonsToReturnThisTablet) {
 +
 +    if (tm.isFutureAndCurrentLocationSet()) {
 +      // no need to check everything, we are in a known state where we want 
to return everything.
 +      reasonsToReturnThisTablet.add(ManagementAction.BAD_STATE);
 +    }
 +
 +    if (!tm.getLogs().isEmpty() && (tm.getOperationId() == null
 +        || tm.getOperationId().getType() != TabletOperationType.DELETING)) {
 +      reasonsToReturnThisTablet.add(ManagementAction.NEEDS_RECOVERY);
 +    }
 +
 +    if 
(VolumeUtil.needsVolumeReplacement(tabletMgmtParams.getVolumeReplacements(), 
tm)) {
 +      
reasonsToReturnThisTablet.add(ManagementAction.NEEDS_VOLUME_REPLACEMENT);
 +    }
 +
 +    if (shouldReturnDueToLocation(tm)) {
 +      reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE);
 +    }
 +
 +    if (tm.getOperationId() == null && 
tabletMgmtParams.isTableOnline(tm.getTableId())
 +        && Collections.disjoint(REASONS_NOT_TO_SPLIT_OR_COMPACT, 
reasonsToReturnThisTablet)) {
 +      try {
 +        if (shouldReturnDueToSplit(tm, 
this.env.getPluginEnv().getConfiguration(tm.getTableId()))) {
 +          reasonsToReturnThisTablet.add(ManagementAction.NEEDS_SPLITTING);
 +        }
 +        // important to call this since reasonsToReturnThisTablet is passed 
to it
 +        if (!compactionGenerator
 +            .generateJobs(tm, 
determineCompactionKinds(reasonsToReturnThisTablet)).isEmpty()) {
 +          reasonsToReturnThisTablet.add(ManagementAction.NEEDS_COMPACTING);
 +        }
 +      } catch (NullPointerException e) {
 +        LOG.info(
 +            "Unable to determine if tablet {} should split or compact, maybe 
table was deleted?",
 +            tm.getExtent());
 +      }
 +    }
 +  }
 +
 +  private static final Set<CompactionKind> ALL_COMPACTION_KINDS =
 +      Collections.unmodifiableSet(EnumSet.allOf(CompactionKind.class));
 +  private static final Set<CompactionKind> SPLIT_COMPACTION_KINDS;
 +
 +  static {
 +    var tmp = EnumSet.allOf(CompactionKind.class);
 +    tmp.remove(CompactionKind.SYSTEM);
 +    SPLIT_COMPACTION_KINDS = Collections.unmodifiableSet(tmp);
 +  }
 +
 +  public static Set<CompactionKind>
 +      determineCompactionKinds(Set<ManagementAction> 
reasonsToReturnThisTablet) {
 +    if (reasonsToReturnThisTablet.contains(ManagementAction.NEEDS_SPLITTING)) 
{
 +      return SPLIT_COMPACTION_KINDS;
 +    } else {
 +      return ALL_COMPACTION_KINDS;
 +    }
 +  }
 +}
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java
index fcf4e2422f,0000000000..eb5ceec1bf
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java
@@@ -1,334 -1,0 +1,334 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.split;
 +
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.NoSuchElementException;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.function.Predicate;
 +
 +import org.apache.accumulo.core.client.PluginEnvironment.Configuration;
 +import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.TabletFile;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.conf.TableConfiguration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.io.Text;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Iterables;
 +
 +public class SplitUtils {
 +
 +  private static final Logger log = LoggerFactory.getLogger(SplitUtils.class);
 +
 +  static class IndexIterator implements Iterator<Key> {
 +
 +    private final SortedKeyValueIterator<Key,Value> source;
 +
 +    private final Text prevEndRow;
 +    private final Text endRow;
 +
 +    public IndexIterator(SortedKeyValueIterator<Key,Value> source, Text 
endRow, Text prevEndRow) {
 +      this.source = source;
 +      this.prevEndRow = prevEndRow;
 +      this.endRow = endRow;
 +    }
 +
 +    @Override
 +    public boolean hasNext() {
 +      if (prevEndRow != null) {
 +        // this code filters out data because the rfile index iterators do 
not support seek, so just
 +        // discard everything before our point of interest.
 +        while (source.hasTop() && 
source.getTopKey().getRow().compareTo(prevEndRow) <= 0) {
 +          try {
 +            source.next();
 +          } catch (IOException e) {
 +            throw new UncheckedIOException(e);
 +          }
 +        }
 +      }
 +
 +      if (endRow != null) {
 +        return source.hasTop() && 
source.getTopKey().getRow().compareTo(endRow) <= 0;
 +      }
 +
 +      return source.hasTop();
 +    }
 +
 +    @Override
 +    public Key next() {
 +      if (!hasNext()) {
 +        throw new NoSuchElementException();
 +      }
 +
 +      Key key = source.getTopKey();
 +
 +      try {
 +        source.next();
 +      } catch (IOException e) {
 +        throw new UncheckedIOException(e);
 +      }
 +
 +      return key;
 +    }
 +  }
 +
 +  private static ArrayList<FileSKVIterator> openIndexes(ServerContext context,
 +      TableConfiguration tableConf, Collection<StoredTabletFile> files) 
throws IOException {
 +
 +    ArrayList<FileSKVIterator> readers = new ArrayList<>();
 +
 +    try {
 +      for (TabletFile file : files) {
 +        FileSKVIterator reader = null;
 +        FileSystem ns = 
context.getVolumeManager().getFileSystemByPath(file.getPath());
 +
 +        reader = FileOperations.getInstance().newIndexReaderBuilder()
 +            .forFile(file, ns, ns.getConf(), tableConf.getCryptoService())
 +            .withTableConfiguration(tableConf).build();
 +
 +        readers.add(reader);
 +      }
 +    } catch (IOException ioe) {
 +      readers.forEach(reader -> {
 +        try {
 +          reader.close();
 +        } catch (IOException e) {
 +          log.debug("failed to close reader", e);
 +        }
 +      });
 +      throw ioe;
 +    }
 +
 +    return readers;
 +  }
 +
 +  public static class IndexIterable implements AutoCloseable, Iterable<Key> {
 +
 +    private final ServerContext context;
 +    private final TableConfiguration tableConf;
 +    private final Collection<StoredTabletFile> files;
 +    private final Text prevEndRow;
 +    private final Text endRow;
 +    private ArrayList<FileSKVIterator> readers;
 +
 +    public IndexIterable(ServerContext context, TableConfiguration tableConf,
 +        Collection<StoredTabletFile> files, Text endRow, Text prevEndRow) {
 +      this.context = context;
 +      this.tableConf = tableConf;
 +      this.files = files;
 +      this.prevEndRow = prevEndRow;
 +      this.endRow = endRow;
 +    }
 +
 +    @Override
 +    public Iterator<Key> iterator() {
 +      close();
 +      try {
 +        readers = openIndexes(context, tableConf, files);
 +      } catch (IOException e) {
 +        throw new UncheckedIOException(e);
 +      }
 +
 +      List<SortedKeyValueIterator<Key,Value>> iters = new 
ArrayList<>(readers);
 +      MultiIterator mmfi = new MultiIterator(iters, true);
 +
 +      return new IndexIterator(mmfi, endRow, prevEndRow);
 +    }
 +
 +    @Override
 +    public void close() {
 +      if (readers != null) {
 +        readers.forEach(reader -> {
 +          try {
 +            reader.close();
 +          } catch (IOException e) {
 +            log.debug("Failed to close index reader", e);
 +          }
 +        });
 +        readers = null;
 +      }
 +    }
 +  }
 +
 +  public static int calculateDesiredSplits(long esitimatedSize, long 
splitThreshold) {
 +    // ELASTICITY_TODO tablets used to always split into 2 tablets. Now the 
split operation will
 +    // split into many. How does this impact a tablet with many files and the 
estimated sizes after
 +    // split vs the old method. Need to run test where we add lots of data to 
a single tablet,
 +    // change the split thresh, wait for splits, then look at the estimated 
sizes, then compact and
 +    // look at the sizes after. For example if a tablet has 10M of data and 
the split thesh is set
 +    // to 100K, what will the est sizes look like across the tablets after 
splitting and then after
 +    // compacting?
 +    return (int) Math.floor((double) esitimatedSize / (double) 
splitThreshold);
 +  }
 +
 +  public static SortedSet<Text> findSplits(ServerContext context, 
TabletMetadata tabletMetadata) {
 +    var tableConf = 
context.getTableConfiguration(tabletMetadata.getTableId());
 +    var threshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
 +    var maxEndRowSize = tableConf.getAsBytes(Property.TABLE_MAX_END_ROW_SIZE);
 +
 +    // ELASTICITY_TODO rename and deprecate property. This is not executing 
in the tablet server
 +    // anymore.
-     int maxFilesToOpen = 
tableConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN);
++    int maxFilesToOpen = tableConf.getCount(Property.SPLIT_MAXOPEN);
 +
 +    var estimatedSize = tabletMetadata.getFileSize();
 +    if (!needsSplit(context, tabletMetadata)) {
 +      return new TreeSet<>();
 +    }
 +
 +    if (tabletMetadata.getFiles().size() >= maxFilesToOpen) {
 +      log.warn("Tablet {} has {} files which exceeds the max to open for 
split, so can not split.",
 +          tabletMetadata.getExtent(), tabletMetadata.getFiles().size());
 +      return new TreeSet<>();
 +    }
 +
 +    try (var indexIterable = new IndexIterable(context, tableConf, 
tabletMetadata.getFiles(),
 +        tabletMetadata.getEndRow(), tabletMetadata.getPrevEndRow())) {
 +
 +      Predicate<ByteSequence> splitPredicate = splitCandidate -> {
 +        if (splitCandidate.length() >= maxEndRowSize) {
 +          log.warn("Ignoring split point for {} of length {}", 
tabletMetadata.getExtent(),
 +              splitCandidate.length());
 +          return false;
 +        }
 +
 +        return true;
 +      };
 +
 +      return findSplits(indexIterable, calculateDesiredSplits(estimatedSize, 
threshold),
 +          splitPredicate);
 +    }
 +  }
 +
 +  private static int longestCommonLength(ByteSequence bs1, ByteSequence bs2) {
 +    int common = 0;
 +    while (common < bs1.length() && common < bs2.length()
 +        && bs1.byteAt(common) == bs2.byteAt(common)) {
 +      common++;
 +    }
 +    return common;
 +  }
 +
 +  public static SortedSet<Text> findSplits(Iterable<Key> tabletIndexIterator, 
int desiredSplits,
 +      Predicate<ByteSequence> rowPredicate) {
 +    Preconditions.checkArgument(desiredSplits >= 1);
 +
 +    int numKeys = Iterables.size(tabletIndexIterator);
 +
 +    double interSplitDistance = (double) numKeys / (double) (desiredSplits + 
1);
 +
 +    SortedSet<Text> splits = new TreeSet<>();
 +
 +    long count = 0;
 +
 +    ByteSequence prevRow = null;
 +    ByteSequence lastRow = null;
 +
 +    for (Key key : tabletIndexIterator) {
 +      if (lastRow != null && !key.getRowData().equals(lastRow)) {
 +        prevRow = lastRow;
 +      }
 +
 +      count++;
 +
 +      if (count >= Math.round((splits.size() + 1) * interSplitDistance)) {
 +        if (prevRow == null) {
 +          if (rowPredicate.test(key.getRowData())) {
 +            splits.add(key.getRow());
 +          }
 +        } else {
 +          var lcl = longestCommonLength(prevRow, key.getRowData());
 +          if (lcl + 1 >= key.getRowData().length()) {
 +            if (rowPredicate.test(key.getRowData())) {
 +              splits.add(key.getRow());
 +            }
 +          } else {
 +            var shortenedRow = key.getRowData().subSequence(0, lcl + 1);
 +            if (rowPredicate.test(shortenedRow)) {
 +              splits.add(new Text(shortenedRow.toArray()));
 +            }
 +          }
 +        }
 +
 +        if (splits.size() >= desiredSplits) {
 +          break;
 +        }
 +      }
 +
 +      lastRow = key.getRowData();
 +    }
 +
 +    return splits;
 +  }
 +
 +  public static boolean needsSplit(ServerContext context, TabletMetadata 
tabletMetadata) {
 +    var tableConf = 
context.getTableConfiguration(tabletMetadata.getTableId());
 +    var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
 +    return needsSplit(splitThreshold, tabletMetadata);
 +  }
 +
 +  public static boolean needsSplit(final Configuration tableConf, 
TabletMetadata tabletMetadata) {
 +    var splitThreshold = ConfigurationTypeHelper
 +        
.getFixedMemoryAsBytes(tableConf.get(Property.TABLE_SPLIT_THRESHOLD.getKey()));
 +    return needsSplit(splitThreshold, tabletMetadata);
 +  }
 +
 +  public static boolean needsSplit(long splitThreshold, TabletMetadata 
tabletMetadata) {
 +    return tabletMetadata.getFileSize() > splitThreshold;
 +  }
 +
 +  public static UnSplittableMetadata toUnSplittable(ServerContext context,
 +      TabletMetadata tabletMetadata) {
 +    var tableConf = 
context.getTableConfiguration(tabletMetadata.getTableId());
 +    var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
 +    var maxEndRowSize = tableConf.getAsBytes(Property.TABLE_MAX_END_ROW_SIZE);
-     int maxFilesToOpen = 
tableConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN);
++    int maxFilesToOpen = tableConf.getCount(Property.SPLIT_MAXOPEN);
 +
 +    var unSplittableMetadata = 
UnSplittableMetadata.toUnSplittable(tabletMetadata.getExtent(),
 +        splitThreshold, maxEndRowSize, maxFilesToOpen, 
tabletMetadata.getFiles());
 +
 +    log.trace(
 +        "Created unsplittable metadata for tablet {}. splitThreshold: {}, 
maxEndRowSize:{}, maxFilesToOpen: {}, hashCode: {}",
 +        tabletMetadata.getExtent(), splitThreshold, maxEndRowSize, 
maxFilesToOpen,
 +        unSplittableMetadata);
 +
 +    return unSplittableMetadata;
 +  }
 +
 +}

Reply via email to