This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 46b934f5dc2 Add initial TabletMergeability support to client api (#5274) 46b934f5dc2 is described below commit 46b934f5dc20c61367c24da78b9f1061a38ff8bc Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Sun Jan 19 14:52:01 2025 -0500 Add initial TabletMergeability support to client api (#5274) This is a follow on PR to #5187 for Issue #5014 This commit adds the first part of the needed client API changes to support setting TabletMergeability on splits. Two inital updates have been made to support table creation and adding splits later. 1) A new withSplits() method has been added to NewTableConfiguration that takes a map of splits and associated TabletMergeability. The exiting withSplits() method that takes a set just delegates to the new method and sets a default for TabletMergeability to never because by default user created tables should never be automatically mergeable. 2) A new putSplits() method has been added that also takes a map and will create new splits with an associated TabletMergeability. The existing addSplits() method delgates to this as well. Note that this method is only partially done. The intent is to support updating existing splits as well but this will be done in another PR. The support the changes, where necessary the splits are now encoded with the TabletMergeability value as json instead of just passing the split bytes. --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../core/client/admin/NewTableConfiguration.java | 27 +++++- .../core/client/admin/TableOperations.java | 27 ++++++ .../core/clientImpl/TableOperationsImpl.java | 45 ++++++--- .../core/clientImpl/TabletMergeabilityUtil.java | 104 +++++++++++++++++++++ .../schema/TabletMergeabilityMetadata.java | 20 ++++ .../client/admin/NewTableConfigurationTest.java | 37 +++++++- .../core/clientImpl/TableOperationsHelperTest.java | 5 + .../clientImpl/TabletMergeabilityUtilTest.java | 63 +++++++++++++ .../accumulo/manager/FateServiceHandler.java | 31 +++--- .../accumulo/manager/tableOps/TableInfo.java | 12 +-- .../apache/accumulo/manager/tableOps/Utils.java | 20 ++++ .../manager/tableOps/create/ChooseDir.java | 6 +- .../manager/tableOps/create/CreateTable.java | 6 +- .../manager/tableOps/create/PopulateMetadata.java | 40 +++++--- .../split/AllocateDirsAndEnsureOnline.java | 2 +- .../manager/tableOps/split/DeleteOperationIds.java | 4 +- .../manager/tableOps/split/FindSplits.java | 6 +- .../accumulo/manager/tableOps/split/PreSplit.java | 17 ++-- .../accumulo/manager/tableOps/split/SplitInfo.java | 42 +++++---- .../manager/tableOps/split/UpdateTablets.java | 37 +++++--- .../manager/tableOps/split/UpdateTabletsTest.java | 19 +++- .../apache/accumulo/test/fate/ManagerRepoIT.java | 5 +- .../accumulo/test/functional/AddSplitIT.java | 38 ++++++++ .../test/functional/CreateInitialSplitsIT.java | 33 +++++++ 24 files changed, 536 insertions(+), 110 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java index ffb6b46deac..73e59e0a381 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.SortedMap; import java.util.SortedSet; import org.apache.accumulo.core.client.AccumuloException; @@ -38,6 +39,7 @@ import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.client.summary.Summarizer; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.clientImpl.TableOperationsHelper; +import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.user.VersioningIterator; @@ -48,7 +50,7 @@ import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; import org.apache.hadoop.io.Text; -import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.ImmutableSortedMap; /** * This object stores table creation parameters. Currently includes: {@link TimeType}, whether to @@ -71,7 +73,7 @@ public class NewTableConfiguration { private Map<String,String> summarizerProps = Collections.emptyMap(); private Map<String,String> localityProps = Collections.emptyMap(); private final Map<String,String> iteratorProps = new HashMap<>(); - private SortedSet<Text> splitProps = Collections.emptySortedSet(); + private SortedMap<Text,TabletMergeability> splitProps = Collections.emptySortedMap(); private TabletAvailability initialTabletAvailability = TabletAvailability.ONDEMAND; private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps, @@ -188,6 +190,18 @@ public class NewTableConfiguration { * @since 2.0.0 */ public Collection<Text> getSplits() { + return splitProps.keySet(); + } + + /** + * Return Collection of split values and associated TabletMergeability. + * + * @return Collection containing splits and TabletMergeability associated with this + * NewTableConfiguration object. + * + * @since 4.0.0 + */ + public SortedMap<Text,TabletMergeability> getSplitsMap() { return splitProps; } @@ -258,10 +272,17 @@ public class NewTableConfiguration { * * @since 2.0.0 */ + @SuppressWarnings("unchecked") public NewTableConfiguration withSplits(final SortedSet<Text> splits) { checkArgument(splits != null, "splits set is null"); checkArgument(!splits.isEmpty(), "splits set is empty"); - this.splitProps = ImmutableSortedSet.copyOf(splits); + return withSplits(TabletMergeabilityUtil.userDefaultSplits(splits)); + } + + public NewTableConfiguration withSplits(final SortedMap<Text,TabletMergeability> splits) { + checkArgument(splits != null, "splits set is null"); + checkArgument(!splits.isEmpty(), "splits set is empty"); + this.splitProps = ImmutableSortedMap.copyOf(splits); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java index 0a756c1b2b0..fc3734c0bcb 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.SortedMap; import java.util.SortedSet; import java.util.concurrent.Executor; import java.util.function.Consumer; @@ -187,6 +188,32 @@ public interface TableOperations { void addSplits(String tableName, SortedSet<Text> partitionKeys) throws TableNotFoundException, AccumuloException, AccumuloSecurityException; + /** + * + * Ensures that tablets are split along a set of keys. + * + * TODO: This method currently only adds new splits (existing are stripped). The intent in a + * future PR is so support updating existing splits and the TabletMergeabilty setting. See + * https://github.com/apache/accumulo/issues/5014 + * + * <p> + * Note that while the documentation for Text specifies that its bytestream should be UTF-8, the + * encoding is not enforced by operations that work with byte arrays. + * <p> + * For example, you can create 256 evenly-sliced splits via the following code sample even though + * the given byte sequences are not valid UTF-8. + * + * @param tableName the name of the table + * @param splits a sorted map of row key values to pre-split the table on and associated + * TabletMergeability + * + * @throws AccumuloException if a general error occurs + * @throws AccumuloSecurityException if the user does not have permission + * @throws TableNotFoundException if the table does not exist + */ + void putSplits(String tableName, SortedMap<Text,TabletMergeability> splits) + throws TableNotFoundException, AccumuloException, AccumuloSecurityException; + /** * @param tableName the name of the table * @return the split points (end-row names) for the table's current split profile diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 0b1ff708519..9f70d038a9a 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -66,6 +66,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; @@ -105,6 +106,7 @@ import org.apache.accumulo.core.client.admin.SummaryRetriever; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TabletInformation; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; @@ -263,11 +265,11 @@ public class TableOperationsImpl extends TableOperationsHelper { // Check for possible initial splits to be added at table creation // Always send number of initial splits to be created, even if zero. If greater than zero, // add the splits to the argument List which will be used by the FATE operations. - int numSplits = ntc.getSplits().size(); + int numSplits = ntc.getSplitsMap().size(); args.add(ByteBuffer.wrap(String.valueOf(numSplits).getBytes(UTF_8))); if (numSplits > 0) { - for (Text t : ntc.getSplits()) { - args.add(TextUtil.getByteBuffer(t)); + for (Entry<Text,TabletMergeability> t : ntc.getSplitsMap().entrySet()) { + args.add(TabletMergeabilityUtil.encodeAsBuffer(t.getKey(), t.getValue())); } } @@ -475,8 +477,15 @@ public class TableOperationsImpl extends TableOperationsHelper { public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED"; @Override + @SuppressWarnings("unchecked") public void addSplits(String tableName, SortedSet<Text> splits) throws AccumuloException, TableNotFoundException, AccumuloSecurityException { + putSplits(tableName, TabletMergeabilityUtil.userDefaultSplits(splits)); + } + + @Override + public void putSplits(String tableName, SortedMap<Text,TabletMergeability> splits) + throws AccumuloException, TableNotFoundException, AccumuloSecurityException { EXISTING_TABLE_NAME.validate(tableName); @@ -487,7 +496,8 @@ public class TableOperationsImpl extends TableOperationsHelper { ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId); - SortedSet<Text> splitsTodo = Collections.synchronizedSortedSet(new TreeSet<>(splits)); + SortedMap<Text,TabletMergeability> splitsTodo = + Collections.synchronizedSortedMap(new TreeMap<>(splits)); final ByteBuffer EMPTY = ByteBuffer.allocate(0); @@ -500,13 +510,14 @@ public class TableOperationsImpl extends TableOperationsHelper { tabLocator.invalidateCache(); - Map<KeyExtent,List<Text>> tabletSplits = + Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo); List<CompletableFuture<Void>> futures = new ArrayList<>(); // begin the fate operation for each tablet without waiting for the operation to complete - for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) { + for (Entry<KeyExtent,List<Pair<Text,TabletMergeability>>> splitsForTablet : tabletSplits + .entrySet()) { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { var extent = splitsForTablet.getKey(); @@ -515,7 +526,9 @@ public class TableOperationsImpl extends TableOperationsHelper { args.add(extent.endRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.endRow())); args.add( extent.prevEndRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.prevEndRow())); - splitsForTablet.getValue().forEach(split -> args.add(TextUtil.getByteBuffer(split))); + + splitsForTablet.getValue() + .forEach(split -> args.add(TabletMergeabilityUtil.encodeAsBuffer(split))); try { return handleFateOperation(() -> { @@ -533,13 +546,13 @@ public class TableOperationsImpl extends TableOperationsHelper { // wait for the fate operation to complete in a separate thread pool }, startExecutor).thenApplyAsync(pair -> { final TFateId opid = pair.getFirst(); - final List<Text> completedSplits = pair.getSecond(); + final List<Pair<Text,TabletMergeability>> completedSplits = pair.getSecond(); try { String status = handleFateOperation(() -> waitForFateOperation(opid), tableName); if (SPLIT_SUCCESS_MSG.equals(status)) { - completedSplits.forEach(splitsTodo::remove); + completedSplits.stream().map(Pair::getFirst).forEach(splitsTodo::remove); } } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException | AccumuloSecurityException | TableNotFoundException | AccumuloException e) { @@ -593,14 +606,15 @@ public class TableOperationsImpl extends TableOperationsHelper { waitExecutor.shutdown(); } - private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId tableId, - ClientTabletCache tabLocator, SortedSet<Text> splitsTodo) + private Map<KeyExtent,List<Pair<Text,TabletMergeability>>> mapSplitsToTablets(String tableName, + TableId tableId, ClientTabletCache tabLocator, SortedMap<Text,TabletMergeability> splitsTodo) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>(); + Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = new HashMap<>(); - var iterator = splitsTodo.iterator(); + var iterator = splitsTodo.entrySet().iterator(); while (iterator.hasNext()) { - var split = iterator.next(); + var splitEntry = iterator.next(); + var split = splitEntry.getKey(); try { Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) @@ -624,7 +638,8 @@ public class TableOperationsImpl extends TableOperationsHelper { continue; } - tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>()).add(split); + tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>()) + .add(Pair.fromEntry(splitEntry)); } catch (InvalidTabletHostingRequestException e) { // not expected diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java new file mode 100644 index 00000000000..c4c8d3a873c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Comparator; +import java.util.Objects; +import java.util.Optional; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.function.Function; + +import org.apache.accumulo.core.client.admin.TabletMergeability; +import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.accumulo.core.util.json.ByteArrayToBase64TypeAdapter; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSortedMap; +import com.google.gson.Gson; + +public class TabletMergeabilityUtil { + + private static final Gson gson = ByteArrayToBase64TypeAdapter.createBase64Gson(); + + public static SortedMap<Text,TabletMergeability> userDefaultSplits(SortedSet<Text> splits) { + return splitsWithDefault(splits, TabletMergeability.never()); + } + + public static SortedMap<Text,TabletMergeability> systemDefaultSplits(SortedSet<Text> splits) { + return splitsWithDefault(splits, TabletMergeability.always()); + } + + @SuppressWarnings("unchecked") + public static SortedMap<Text,TabletMergeability> splitsWithDefault(SortedSet<Text> splits, + TabletMergeability tabletMergeability) { + Objects.requireNonNull(tabletMergeability); + return splits.stream() + .collect(ImmutableSortedMap.toImmutableSortedMap(Optional + .ofNullable((Comparator<Text>) splits.comparator()).orElse(Comparator.naturalOrder()), + Function.identity(), t -> tabletMergeability)); + } + + public static ByteBuffer encodeAsBuffer(Text split, TabletMergeability tabletMergeability) { + return ByteBuffer.wrap(encode(split, tabletMergeability).getBytes(UTF_8)); + } + + // Encode A split and TabletMergeability as json. The split will be Base64 encoded + public static String encode(Text split, TabletMergeability tabletMergeability) { + GSonData jData = new GSonData(); + jData.split = TextUtil.getBytes(split); + jData.never = tabletMergeability.isNever(); + jData.delay = tabletMergeability.getDelay().map(Duration::toNanos).orElse(null); + return gson.toJson(jData); + } + + public static ByteBuffer encodeAsBuffer(Pair<Text,TabletMergeability> split) { + return ByteBuffer.wrap(encode(split).getBytes(UTF_8)); + } + + public static String encode(Pair<Text,TabletMergeability> split) { + return encode(split.getFirst(), split.getSecond()); + } + + public static Pair<Text,TabletMergeability> decode(ByteBuffer data) { + return decode(ByteBufferUtil.toString(data)); + } + + public static Pair<Text,TabletMergeability> decode(String data) { + GSonData jData = gson.fromJson(data, GSonData.class); + Preconditions.checkArgument(jData.never == (jData.delay == null), + "delay should both be set if and only if mergeability 'never' is false"); + return new Pair<>(new Text(jData.split), jData.never ? TabletMergeability.never() + : TabletMergeability.after(Duration.ofNanos(jData.delay))); + } + + private static class GSonData { + byte[] split; + boolean never; + Long delay; + } + +} diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMergeabilityMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMergeabilityMetadata.java index 0880e402d96..431d3bd8f2a 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMergeabilityMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMergeabilityMetadata.java @@ -58,6 +58,18 @@ public class TabletMergeabilityMetadata implements Serializable { return tabletMergeability; } + public boolean isNever() { + return tabletMergeability.isNever(); + } + + public boolean isAlways() { + return tabletMergeability.isAlways(); + } + + public Optional<Duration> getDelay() { + return tabletMergeability.getDelay(); + } + public Optional<SteadyTime> getSteadyTime() { return Optional.ofNullable(steadyTime); } @@ -134,6 +146,14 @@ public class TabletMergeabilityMetadata implements Serializable { return new TabletMergeabilityMetadata(TabletMergeability.after(delay), currentTime); } + public static TabletMergeabilityMetadata toMetadata(TabletMergeability mergeability, + SteadyTime currentTime) { + if (mergeability.isNever()) { + return TabletMergeabilityMetadata.never(); + } + return after(mergeability.getDelay().orElseThrow(), currentTime); + } + public static Value toValue(TabletMergeabilityMetadata tmm) { return new Value(tmm.toJson()); } diff --git a/core/src/test/java/org/apache/accumulo/core/client/admin/NewTableConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/client/admin/NewTableConfigurationTest.java index f315ca5f426..229ba77f11a 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/admin/NewTableConfigurationTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/admin/NewTableConfigurationTest.java @@ -19,16 +19,21 @@ package org.apache.accumulo.core.client.admin; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.time.Duration; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import org.apache.accumulo.core.client.IteratorSetting; @@ -36,6 +41,7 @@ import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.client.summary.Summarizer; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer; +import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.hadoop.io.Text; @@ -74,7 +80,35 @@ public class NewTableConfigurationTest { @Test public void testWithAndGetSplits() { NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits); - Collection<Text> ntcSplits = ntc.getSplits(); + verifySplits(ntc.getSplits()); + } + + @Test + public void testWithSplitsMap() { + // Test map with setting splits to all the same TabletMergeability + NewTableConfiguration ntc = new NewTableConfiguration(); + ntc.withSplits( + TabletMergeabilityUtil.splitsWithDefault(this.splits, TabletMergeability.never())); + verifySplits(ntc.getSplits()); + assertTrue( + ntc.getSplitsMap().values().stream().allMatch(tm -> tm.equals(TabletMergeability.never()))); + + // Test a mixture of all types of TabletMergeability + SortedMap<Text,TabletMergeability> splits = new TreeMap<>(); + splits.put(new Text("s1"), TabletMergeability.never()); + splits.put(new Text("s2"), TabletMergeability.always()); + splits.put(new Text("s3"), TabletMergeability.after(Duration.ofHours(3))); + splits.put(new Text("s4"), TabletMergeability.after(Duration.ofDays(2))); + + // Want test to fail if NewTableConfiguration were to try to modify the map passed to it. + splits = Collections.unmodifiableSortedMap(splits); + ntc = new NewTableConfiguration(); + ntc.withSplits(splits); + assertNotSame(splits, ntc.getSplitsMap()); + assertEquals(splits, ntc.getSplitsMap()); + } + + private void verifySplits(Collection<Text> ntcSplits) { Iterator<Text> splitIt = splits.iterator(); Iterator<Text> ntcIt = ntcSplits.iterator(); while (splitIt.hasNext() && ntcIt.hasNext()) { @@ -100,7 +134,6 @@ public class NewTableConfigurationTest { NewTableConfiguration ntc2 = new NewTableConfiguration(); Collection<Text> splits = ntc2.getSplits(); assertTrue(splits.isEmpty()); - } @Test diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/TableOperationsHelperTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/TableOperationsHelperTest.java index ce538428d23..6a01b4da083 100644 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/TableOperationsHelperTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/TableOperationsHelperTest.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.function.Consumer; @@ -48,6 +49,7 @@ import org.apache.accumulo.core.client.admin.Locations; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.SummaryRetriever; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.data.Range; @@ -80,6 +82,9 @@ public class TableOperationsHelperTest { @Override public void addSplits(String tableName, SortedSet<Text> partitionKeys) {} + @Override + public void putSplits(String tableName, SortedMap<Text,TabletMergeability> partitionKeys) {} + @Override public Collection<Text> listSplits(String tableName) { return null; diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtilTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtilTest.java new file mode 100644 index 00000000000..42564322fe2 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtilTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.UUID; + +import org.apache.accumulo.core.client.admin.TabletMergeability; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class TabletMergeabilityUtilTest { + + @Test + public void testEncodeDecode() { + var text = getRandomText(); + String json = TabletMergeabilityUtil.encode(text, TabletMergeability.never()); + var decoded = TabletMergeabilityUtil.decode(json); + assertEquals(text, decoded.getFirst()); + assertEquals(TabletMergeability.never(), decoded.getSecond()); + } + + @Test + public void testEncodeDecodeAlways() { + var text = getRandomText(); + String json = TabletMergeabilityUtil.encode(text, TabletMergeability.always()); + var decoded = TabletMergeabilityUtil.decode(json); + assertEquals(text, decoded.getFirst()); + assertEquals(TabletMergeability.always(), decoded.getSecond()); + } + + @Test + public void testEncodeDecodeBuffer() { + var text = getRandomText(); + var jsonBuffer = + TabletMergeabilityUtil.encodeAsBuffer(text, TabletMergeability.after(Duration.ofDays(1))); + var decoded = TabletMergeabilityUtil.decode(jsonBuffer); + assertEquals(text, decoded.getFirst()); + assertEquals(TabletMergeability.after(Duration.ofDays(1)), decoded.getSecond()); + } + + private Text getRandomText() { + return new Text(String.valueOf(UUID.randomUUID()).replaceAll("-", "")); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index 7cdde03dc52..4efb6025204 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -34,7 +34,6 @@ import static org.apache.accumulo.core.util.Validators.sameNamespaceAs; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Base64; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -42,8 +41,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -53,9 +52,11 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.InitialTableState; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.clientImpl.Namespaces; import org.apache.accumulo.core.clientImpl.TableOperationsImpl; +import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; import org.apache.accumulo.core.clientImpl.UserCompactionUtils; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.clientImpl.thrift.TInfo; @@ -80,9 +81,9 @@ import org.apache.accumulo.core.manager.thrift.TFateId; import org.apache.accumulo.core.manager.thrift.TFateInstanceType; import org.apache.accumulo.core.manager.thrift.TFateOperation; import org.apache.accumulo.core.manager.thrift.ThriftPropertyException; -import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.Validator; import org.apache.accumulo.core.util.tables.TableNameUtil; @@ -257,7 +258,7 @@ class FateServiceHandler implements FateService.Iface { manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName, timeType, options, splitsPath, splitCount, splitsDirsPath, initialTableState, - initialTabletAvailability, namespaceId, TabletMergeabilityMetadata.never())), + initialTabletAvailability, namespaceId, TabletMergeability.never())), autoCleanup, goalMessage); break; @@ -752,16 +753,18 @@ class FateServiceHandler implements FateService.Iface { endRow = endRow.getLength() == 0 ? null : endRow; prevEndRow = prevEndRow.getLength() == 0 ? null : prevEndRow; - SortedSet<Text> splits = arguments.subList(SPLIT_OFFSET, arguments.size()).stream() - .map(ByteBufferUtil::toText).collect(Collectors.toCollection(TreeSet::new)); + SortedMap<Text, + TabletMergeability> splits = arguments.subList(SPLIT_OFFSET, arguments.size()).stream() + .map(TabletMergeabilityUtil::decode).collect( + Collectors.toMap(Pair::getFirst, Pair::getSecond, (a, b) -> a, TreeMap::new)); KeyExtent extent = new KeyExtent(tableId, endRow, prevEndRow); Predicate<Text> outOfBoundsTest = split -> !extent.contains(split) || split.equals(extent.endRow()); - if (splits.stream().anyMatch(outOfBoundsTest)) { - splits.stream().filter(outOfBoundsTest).forEach(split -> log + if (splits.keySet().stream().anyMatch(outOfBoundsTest)) { + splits.keySet().stream().filter(outOfBoundsTest).forEach(split -> log .warn("split for {} is out of bounds : {}", extent, TextUtil.truncate(split))); throw new ThriftTableOperationException(tableId.canonical(), null, tableOp, @@ -774,8 +777,8 @@ class FateServiceHandler implements FateService.Iface { Predicate<Text> oversizedTest = split -> split.getLength() > maxSplitSize; - if (splits.stream().anyMatch(oversizedTest)) { - splits.stream().filter(oversizedTest) + if (splits.keySet().stream().anyMatch(oversizedTest)) { + splits.keySet().stream().filter(oversizedTest) .forEach(split -> log.warn( "split exceeds max configured split size len:{} max:{} extent:{} split:{}", split.getLength(), maxSplitSize, extent, TextUtil.truncate(split))); @@ -925,11 +928,11 @@ class FateServiceHandler implements FateService.Iface { final int splitCount, final int splitOffset) throws IOException { FileSystem fs = splitsPath.getFileSystem(manager.getContext().getHadoopConf()); try (FSDataOutputStream stream = fs.create(splitsPath)) { - // base64 encode because splits can contain binary for (int i = splitOffset; i < splitCount + splitOffset; i++) { + // This is already encoded as json byte[] splitBytes = ByteBufferUtil.toBytes(arguments.get(i)); - String encodedSplit = Base64.getEncoder().encodeToString(splitBytes); - stream.write((encodedSplit + '\n').getBytes(UTF_8)); + stream.write(splitBytes); + stream.write("\n".getBytes(UTF_8)); } } catch (IOException e) { log.error("Error in FateServiceHandler while writing splits to {}: {}", splitsPath, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TableInfo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TableInfo.java index 8852a41cd94..f283d48ea13 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TableInfo.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TableInfo.java @@ -23,10 +23,10 @@ import java.util.Map; import org.apache.accumulo.core.client.admin.InitialTableState; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.hadoop.fs.Path; public class TableInfo implements Serializable { @@ -52,7 +52,7 @@ public class TableInfo implements Serializable { private TabletAvailability initialTabletAvailability; - private TabletMergeabilityMetadata initialTabletMergeability; + private TabletMergeability defaultTabletMergeability; public TabletAvailability getInitialTabletAvailability() { return initialTabletAvailability; @@ -136,11 +136,11 @@ public class TableInfo implements Serializable { this.initialSplitSize = initialSplitSize; } - public TabletMergeabilityMetadata getInitialTabletMergeability() { - return initialTabletMergeability; + public TabletMergeability getDefaultTabletMergeability() { + return defaultTabletMergeability; } - public void setInitialTabletMergeability(TabletMergeabilityMetadata initialTabletMergeability) { - this.initialTabletMergeability = initialTabletMergeability; + public void setDefaultTabletMergeability(TabletMergeability defaultTabletMergeability) { + this.defaultTabletMergeability = defaultTabletMergeability; } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java index 20a962cbbe1..8517503639e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java @@ -25,7 +25,9 @@ import java.math.BigInteger; import java.util.Base64; import java.util.HashMap; import java.util.Map; +import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -33,9 +35,11 @@ import java.util.function.Function; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.NamespaceNotFoundException; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.clientImpl.Namespace; import org.apache.accumulo.core.clientImpl.Namespaces; +import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.data.AbstractId; @@ -48,6 +52,7 @@ import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType import org.apache.accumulo.core.fate.zookeeper.FateLock; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooReservation; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.tables.TableNameUtil; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.server.ServerContext; @@ -277,4 +282,19 @@ public class Utils { return data; } + public static SortedMap<Text,TabletMergeability> getSortedSplitsFromFile(Manager manager, + Path path) throws IOException { + FileSystem fs = path.getFileSystem(manager.getContext().getHadoopConf()); + var data = new TreeMap<Text,TabletMergeability>(); + try (var file = new java.util.Scanner(fs.open(path), UTF_8)) { + while (file.hasNextLine()) { + String line = file.nextLine(); + log.trace("split line: {}", line); + Pair<Text,TabletMergeability> splitTm = TabletMergeabilityUtil.decode(line); + data.put(splitTm.getFirst(), splitTm.getSecond()); + } + } + return data; + } + } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java index 6532cb30fea..a80bb41c82c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java @@ -19,12 +19,15 @@ package org.apache.accumulo.manager.tableOps.create; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.manager.tableOps.Utils.getSortedSplitsFromFile; import java.io.IOException; +import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.manager.Manager; @@ -83,7 +86,8 @@ class ChooseDir extends ManagerRepo { * to the file system for later use during this FATE operation. */ private void createTableDirectoriesInfo(Manager manager) throws IOException { - SortedSet<Text> splits = Utils.getSortedSetFromFile(manager, tableInfo.getSplitPath(), true); + SortedMap<Text,TabletMergeability> splits = + Utils.getSortedSplitsFromFile(manager, tableInfo.getSplitPath()); SortedSet<Text> tabletDirectoryInfo = createTabletDirectoriesSet(manager, splits.size()); writeTabletDirectoriesToFileSystem(manager, tabletDirectoryInfo); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java index f0d74597db6..167d554a858 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.accumulo.core.client.admin.InitialTableState; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.data.NamespaceId; @@ -30,7 +31,6 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType; -import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.TableInfo; @@ -49,7 +49,7 @@ public class CreateTable extends ManagerRepo { public CreateTable(String user, String tableName, TimeType timeType, Map<String,String> props, Path splitPath, int splitCount, Path splitDirsPath, InitialTableState initialTableState, TabletAvailability initialTabletAvailability, NamespaceId namespaceId, - TabletMergeabilityMetadata initialTabletMergeability) { + TabletMergeability defaultTabletMergeability) { tableInfo = new TableInfo(); tableInfo.setTableName(tableName); tableInfo.setTimeType(timeType); @@ -61,7 +61,7 @@ public class CreateTable extends ManagerRepo { tableInfo.setInitialTableState(initialTableState); tableInfo.setSplitDirsPath(splitDirsPath); tableInfo.setInitialTabletAvailability(initialTabletAvailability); - tableInfo.setInitialTabletMergeability(initialTabletMergeability); + tableInfo.setDefaultTabletMergeability(defaultTabletMergeability); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java index 2b987c7a4e0..c0f1ab3a9b5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java @@ -18,19 +18,26 @@ */ package org.apache.accumulo.manager.tableOps.create; +import static org.apache.accumulo.manager.tableOps.Utils.getSortedSplitsFromFile; + +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; import java.util.SortedSet; -import java.util.TreeSet; +import java.util.TreeMap; import java.util.stream.Stream; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; -import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.TableInfo; @@ -58,29 +65,34 @@ class PopulateMetadata extends ManagerRepo { @Override public Repo<Manager> call(FateId fateId, Manager env) throws Exception { - SortedSet<Text> splits; + SortedMap<Text,TabletMergeability> splits; Map<Text,Text> splitDirMap; if (tableInfo.getInitialSplitSize() > 0) { - splits = Utils.getSortedSetFromFile(env, tableInfo.getSplitPath(), true); + splits = Utils.getSortedSplitsFromFile(env, tableInfo.getSplitPath()); SortedSet<Text> dirs = Utils.getSortedSetFromFile(env, tableInfo.getSplitDirsPath(), false); splitDirMap = createSplitDirectoryMap(splits, dirs); } else { - splits = new TreeSet<>(); + splits = new TreeMap<>(); splitDirMap = Map.of(); } - writeSplitsToMetadataTable(env.getContext(), splits, splitDirMap, env.getManagerLock()); + writeSplitsToMetadataTable(env.getContext(), splits, splitDirMap, env.getSteadyTime()); return new FinishCreateTable(tableInfo); } - private void writeSplitsToMetadataTable(ServerContext context, SortedSet<Text> splits, - Map<Text,Text> data, ServiceLock lock) { + private void writeSplitsToMetadataTable(ServerContext context, + SortedMap<Text,TabletMergeability> splits, Map<Text,Text> data, SteadyTime steadyTime) { try (var tabletsMutator = context.getAmple().mutateTablets()) { Text prevSplit = null; - Iterable<Text> iter = () -> Stream.concat(splits.stream(), Stream.of((Text) null)).iterator(); - for (Text split : iter) { + Iterable<Entry<Text,TabletMergeability>> iter = + () -> Stream.concat(splits.entrySet().stream(), + Stream.of(new SimpleImmutableEntry<Text,TabletMergeability>(null, + tableInfo.getDefaultTabletMergeability()))) + .iterator(); + for (Entry<Text,TabletMergeability> entry : iter) { + var split = entry.getKey(); var extent = new KeyExtent(tableInfo.getTableId(), split, prevSplit); var tabletMutator = tabletsMutator.mutateTablet(extent); @@ -92,7 +104,8 @@ class PopulateMetadata extends ManagerRepo { tabletMutator.putDirName(dirName); tabletMutator.putTime(new MetadataTime(0, tableInfo.getTimeType())); tabletMutator.putTabletAvailability(tableInfo.getInitialTabletAvailability()); - tabletMutator.putTabletMergeability(tableInfo.getInitialTabletMergeability()); + tabletMutator.putTabletMergeability( + TabletMergeabilityMetadata.toMetadata(entry.getValue(), steadyTime)); tabletMutator.mutate(); prevSplit = split; @@ -109,10 +122,11 @@ class PopulateMetadata extends ManagerRepo { /** * Create a map containing an association between each split directory and a split value. */ - private Map<Text,Text> createSplitDirectoryMap(SortedSet<Text> splits, SortedSet<Text> dirs) { + private Map<Text,Text> createSplitDirectoryMap(SortedMap<Text,TabletMergeability> splits, + SortedSet<Text> dirs) { Preconditions.checkArgument(splits.size() == dirs.size()); Map<Text,Text> data = new HashMap<>(); - Iterator<Text> s = splits.iterator(); + Iterator<Text> s = splits.keySet().iterator(); Iterator<Text> d = dirs.iterator(); while (s.hasNext() && d.hasNext()) { data.put(s.next(), d.next()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java index 0b090cbcd2d..978a96edf34 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java @@ -99,7 +99,7 @@ public class AllocateDirsAndEnsureOnline extends ManagerRepo { // same dir name each time it runs again making it idempotent. List<String> dirs = new ArrayList<>(); - splitInfo.getSplits().forEach(split -> { + splitInfo.getSplits().keySet().forEach(split -> { String dirName = TabletNameGenerator.createTabletDirectoryName(manager.getContext(), split); dirs.add(dirName); log.trace("{} allocated dir name {}", fateId, dirName); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java index 9748f2d21ff..13070cc7863 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java @@ -52,7 +52,7 @@ public class DeleteOperationIds extends ManagerRepo { Ample.RejectionHandler rejectionHandler = tabletMetadata -> !opid.equals(tabletMetadata.getOperationId()); - splitInfo.getTablets().forEach(extent -> { + splitInfo.getTablets().keySet().forEach(extent -> { tabletsMutator.mutateTablet(extent).requireOperation(opid).requireAbsentLocation() .requireAbsentLogs().deleteOperation().submit(rejectionHandler); }); @@ -72,7 +72,7 @@ public class DeleteOperationIds extends ManagerRepo { manager.getEventCoordinator().event(splitInfo.getOriginal(), "Added %d splits to %s", splitInfo.getSplits().size(), splitInfo.getOriginal()); - TabletLogger.split(splitInfo.getOriginal(), splitInfo.getSplits()); + TabletLogger.split(splitInfo.getOriginal(), splitInfo.getSplits().navigableKeySet()); } return null; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java index c614ce4be2e..6d7cbf1787c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java @@ -23,7 +23,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import java.util.Optional; import java.util.SortedSet; -import java.util.TreeSet; +import java.util.TreeMap; import java.util.function.Consumer; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -48,7 +48,7 @@ public class FindSplits extends ManagerRepo { private final SplitInfo splitInfo; public FindSplits(KeyExtent extent) { - this.splitInfo = new SplitInfo(extent, new TreeSet<>(), true); + this.splitInfo = new SplitInfo(extent, new TreeMap<>()); } @Override @@ -156,7 +156,7 @@ public class FindSplits extends ManagerRepo { return null; } - return new PreSplit(extent, splits, true); + return new PreSplit(extent, splits); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index 41d8d039ea7..226e9ebfbbb 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -26,8 +26,11 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.SortedMap; import java.util.SortedSet; +import org.apache.accumulo.core.client.admin.TabletMergeability; +import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; @@ -50,16 +53,18 @@ public class PreSplit extends ManagerRepo { private final SplitInfo splitInfo; - public PreSplit(KeyExtent expectedExtent, SortedSet<Text> splits, boolean systemCreated) { + // Constructor used for system Splits + PreSplit(KeyExtent expectedExtent, SortedSet<Text> splits) { + this(expectedExtent, TabletMergeabilityUtil.systemDefaultSplits(splits)); + } + + // Constructor used for user splits + public PreSplit(KeyExtent expectedExtent, SortedMap<Text,TabletMergeability> splits) { Objects.requireNonNull(expectedExtent); Objects.requireNonNull(splits); Preconditions.checkArgument(!splits.isEmpty()); Preconditions.checkArgument(!expectedExtent.isRootTablet()); - this.splitInfo = new SplitInfo(expectedExtent, splits, systemCreated); - } - - public PreSplit(KeyExtent expectedExtent, SortedSet<Text> splits) { - this(expectedExtent, splits, false); + this.splitInfo = new SplitInfo(expectedExtent, splits); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java index 14bac1fe7ef..fe71e91d555 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java @@ -18,10 +18,16 @@ */ package org.apache.accumulo.manager.tableOps.split; +import static java.nio.charset.StandardCharsets.UTF_8; + import java.io.Serializable; -import java.util.SortedSet; -import java.util.TreeSet; +import java.nio.ByteBuffer; +import java.util.NavigableMap; +import java.util.SortedMap; +import java.util.TreeMap; +import org.apache.accumulo.core.client.admin.TabletMergeability; +import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.util.TextUtil; @@ -36,19 +42,18 @@ public class SplitInfo implements Serializable { private final byte[] prevEndRow; private final byte[] endRow; private final byte[][] splits; - private final boolean systemCreated; - public SplitInfo(KeyExtent extent, SortedSet<Text> splits, boolean systemCreated) { + public SplitInfo(KeyExtent extent, SortedMap<Text,TabletMergeability> splits) { this.tableId = extent.tableId(); this.prevEndRow = extent.prevEndRow() == null ? null : TextUtil.getBytes(extent.prevEndRow()); this.endRow = extent.endRow() == null ? null : TextUtil.getBytes(extent.endRow()); this.splits = new byte[splits.size()][]; - this.systemCreated = systemCreated; int index = 0; - for (var split : splits) { - Preconditions.checkArgument(extent.contains(split)); - this.splits[index] = TextUtil.getBytes(split); + for (var split : splits.entrySet()) { + Preconditions.checkArgument(extent.contains(split.getKey())); + this.splits[index] = + TabletMergeabilityUtil.encode(split.getKey(), split.getValue()).getBytes(UTF_8); index++; } } @@ -61,33 +66,32 @@ public class SplitInfo implements Serializable { return new KeyExtent(tableId, toText(endRow), toText(prevEndRow)); } - SortedSet<Text> getSplits() { - SortedSet<Text> splits = new TreeSet<>(); + NavigableMap<Text,TabletMergeability> getSplits() { + NavigableMap<Text,TabletMergeability> splits = new TreeMap<>(); for (int i = 0; i < this.splits.length; i++) { - splits.add(new Text(this.splits[i])); + var split = TabletMergeabilityUtil.decode(ByteBuffer.wrap(this.splits[i])); + splits.put(split.getFirst(), split.getSecond()); } return splits; } - SortedSet<KeyExtent> getTablets() { + NavigableMap<KeyExtent,TabletMergeability> getTablets() { Text prev = getOriginal().prevEndRow(); - TreeSet<KeyExtent> tablets = new TreeSet<>(); + NavigableMap<KeyExtent,TabletMergeability> tablets = new TreeMap<>(); - for (var split : getSplits()) { + for (var entry : getSplits().entrySet()) { + var split = entry.getKey(); var extent = new KeyExtent(getOriginal().tableId(), split, prev); prev = split; - tablets.add(extent); + tablets.put(extent, entry.getValue()); } var extent = new KeyExtent(getOriginal().tableId(), getOriginal().endRow(), prev); - tablets.add(extent); + tablets.put(extent, null); return tablets; } - boolean isSystemCreated() { - return systemCreated; - } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index 7da6f36af6d..5a6e835119a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -22,11 +22,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Objects; -import java.util.Set; -import java.util.SortedSet; +import java.util.SortedMap; import java.util.TreeMap; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; @@ -68,8 +69,8 @@ public class UpdateTablets extends ManagerRepo { if (tabletMetadata == null) { // check to see if this operation has already succeeded. - TabletMetadata newTabletMetadata = - manager.getContext().getAmple().readTablet(splitInfo.getTablets().last()); + TabletMetadata newTabletMetadata = manager.getContext().getAmple() + .readTablet(splitInfo.getTablets().navigableKeySet().last()); if (newTabletMetadata != null && opid.equals(newTabletMetadata.getOperationId())) { // have already created the new tablet and failed before we could return the next step, so @@ -123,12 +124,12 @@ public class UpdateTablets extends ManagerRepo { * split. */ static Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> getNewTabletFiles( - Set<KeyExtent> newTablets, TabletMetadata tabletMetadata, + SortedMap<KeyExtent,TabletMergeability> newTablets, TabletMetadata tabletMetadata, Function<StoredTabletFile,Splitter.FileInfo> fileInfoProvider) { Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> tabletsFiles = new TreeMap<>(); - newTablets.forEach(extent -> tabletsFiles.put(extent, new HashMap<>())); + newTablets.keySet().forEach(extent -> tabletsFiles.put(extent, new HashMap<>())); // determine which files overlap which tablets and their estimated sizes tabletMetadata.getFilesMap().forEach((file, dataFileValue) -> { @@ -157,7 +158,7 @@ public class UpdateTablets extends ManagerRepo { } // count how many of the new tablets the file will overlap - double numOverlapping = newTablets.stream().map(KeyExtent::toDataRange) + double numOverlapping = newTablets.keySet().stream().map(KeyExtent::toDataRange) .filter(range -> range.clip(fileRange, true) != null).count(); Preconditions.checkState(numOverlapping > 0); @@ -167,7 +168,7 @@ public class UpdateTablets extends ManagerRepo { double entriesPerTablet = dataFileValue.getNumEntries() / numOverlapping; // add the file to the tablets it overlaps - newTablets.forEach(newTablet -> { + newTablets.keySet().forEach(newTablet -> { if (newTablet.toDataRange().clip(fileRange, true) != null) { DataFileValue ndfv = new DataFileValue((long) sizePerTablet, (long) entriesPerTablet, dataFileValue.getTime()); @@ -194,13 +195,16 @@ public class UpdateTablets extends ManagerRepo { } private void addNewTablets(FateId fateId, Manager manager, TabletMetadata tabletMetadata, - TabletOperationId opid, SortedSet<KeyExtent> newTablets, + TabletOperationId opid, NavigableMap<KeyExtent,TabletMergeability> newTablets, Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> newTabletsFiles) { Iterator<String> dirNameIter = dirNames.iterator(); try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { - for (var newExtent : newTablets) { - if (newExtent.equals(newTablets.last())) { + for (var entry : newTablets.entrySet()) { + var newExtent = entry.getKey(); + var mergeability = entry.getValue(); + + if (newExtent.equals(newTablets.navigableKeySet().last())) { // Skip the last tablet, its done after successfully adding all new tablets continue; } @@ -219,9 +223,12 @@ public class UpdateTablets extends ManagerRepo { .debug("{} copying compacted marker to new child tablet {}", fateId, compactedFateId)); mutator.putTabletAvailability(tabletMetadata.getTabletAvailability()); + + // Null is only expected for the last tablet which is skipped + Preconditions.checkState(mergeability != null, + "Null TabletMergeability for extent %s is unexpected", newExtent); mutator.putTabletMergeability( - splitInfo.isSystemCreated() ? TabletMergeabilityMetadata.always(manager.getSteadyTime()) - : TabletMergeabilityMetadata.never()); + TabletMergeabilityMetadata.toMetadata(mergeability, manager.getSteadyTime())); tabletMetadata.getLoaded().forEach((k, v) -> mutator.putBulkFile(k.getTabletFile(), v)); newTabletsFiles.get(newExtent).forEach(mutator::putFile); @@ -240,10 +247,10 @@ public class UpdateTablets extends ManagerRepo { } private void updateExistingTablet(FateId fateId, Manager manager, TabletMetadata tabletMetadata, - TabletOperationId opid, SortedSet<KeyExtent> newTablets, + TabletOperationId opid, NavigableMap<KeyExtent,TabletMergeability> newTablets, Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> newTabletsFiles) { try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { - var newExtent = newTablets.last(); + var newExtent = newTablets.navigableKeySet().last(); var mutator = tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireOperation(opid) .requireAbsentLocation().requireAbsentLogs(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index 41f9824b7ce..66f8665fbe6 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -28,12 +28,18 @@ import java.util.List; import java.util.Map; import java.util.OptionalLong; import java.util.Set; +import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.TabletMergeability; +import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -139,7 +145,8 @@ public class UpdateTabletsTest { var expected = Map.of(ke1, ke1Expected, ke2, ke2Expected, ke3, ke3Expected, ke4, ke4Expected); - Set<KeyExtent> newExtents = Set.of(ke1, ke2, ke3, ke4); + SortedMap<KeyExtent,TabletMergeability> newExtents = new TreeMap<>(Set.of(ke1, ke2, ke3, ke4) + .stream().collect(Collectors.toMap(Function.identity(), e -> TabletMergeability.never()))); TabletMetadata tabletMeta = EasyMock.createMock(TabletMetadata.class); EasyMock.expect(tabletMeta.getFilesMap()).andReturn(tabletFiles).anyTimes(); @@ -382,8 +389,9 @@ public class UpdateTabletsTest { // Now we can actually test the split code that writes the new tablets with a bunch columns in // the original tablet SortedSet<Text> splits = new TreeSet<>(List.of(newExtent1.endRow(), newExtent2.endRow())); - UpdateTablets updateTablets = - new UpdateTablets(new SplitInfo(origExtent, splits, true), List.of(dir1, dir2)); + UpdateTablets updateTablets = new UpdateTablets( + new SplitInfo(origExtent, TabletMergeabilityUtil.systemDefaultSplits(splits)), + List.of(dir1, dir2)); updateTablets.call(fateId, manager); EasyMock.verify(manager, context, ample, tabletMeta, splitter, tabletsMutator, tablet1Mutator, @@ -461,8 +469,9 @@ public class UpdateTabletsTest { // Now we can actually test the split code that writes the new tablets with a bunch columns in // the original tablet SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"))); - UpdateTablets updateTablets = - new UpdateTablets(new SplitInfo(origExtent, splits, true), List.of("d1")); + UpdateTablets updateTablets = new UpdateTablets( + new SplitInfo(origExtent, TabletMergeabilityUtil.systemDefaultSplits(splits)), + List.of("d1")); updateTablets.call(fateId, manager); EasyMock.verify(manager, context, ample); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java index 3b524d90bcb..617eba0ba16 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java @@ -46,6 +46,7 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; @@ -197,8 +198,8 @@ public class ManagerRepoIT extends SharedMiniClusterBase { assertEquals(opid, testAmple.readTablet(extent).getOperationId()); - var eoRepo = new AllocateDirsAndEnsureOnline( - new SplitInfo(extent, new TreeSet<>(List.of(new Text("sp1"))), true)); + var eoRepo = new AllocateDirsAndEnsureOnline(new SplitInfo(extent, + TabletMergeabilityUtil.systemDefaultSplits(new TreeSet<>(List.of(new Text("sp1")))))); // The repo should delete the opid and throw an exception assertThrows(ThriftTableOperationException.class, () -> eoRepo.call(fateId, manager)); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java index 72ff2bb8ca3..0e6e742d754 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java @@ -24,12 +24,15 @@ import java.time.Duration; import java.util.Collection; import java.util.Iterator; import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.TreeSet; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; @@ -102,6 +105,41 @@ public class AddSplitIT extends AccumuloClusterHarness { } } + @Test + public void addSplitWithMergeabilityTest() throws Exception { + String tableName = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + c.tableOperations().create(tableName); + + SortedMap<Text,TabletMergeability> splits = new TreeMap<>(); + splits.put(new Text(String.format("%09d", 333)), TabletMergeability.always()); + splits.put(new Text(String.format("%09d", 666)), TabletMergeability.never()); + splits.put(new Text(String.format("%09d", 888)), + TabletMergeability.after(Duration.ofSeconds(100))); + splits.put(new Text(String.format("%09d", 999)), + TabletMergeability.after(Duration.ofDays(1))); + + c.tableOperations().putSplits(tableName, splits); + Thread.sleep(100); + assertEquals(splits.keySet(), new TreeSet<>(c.tableOperations().listSplits(tableName))); + + TableId id = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + try (TabletsMetadata tm = getServerContext().getAmple().readTablets().forTable(id).build()) { + tm.stream().forEach(t -> { + // default tablet should be set to never + if (t.getEndRow() == null) { + assertEquals(TabletMergeability.never(), + t.getTabletMergeability().getTabletMergeability()); + } else { + // New splits should match the original setting in the map + assertEquals(splits.get(t.getEndRow()), + t.getTabletMergeability().getTabletMergeability()); + } + }); + } + } + } + private void verifyData(AccumuloClient client, String tableName, long ts) throws Exception { try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java index 0b9b7e64c73..e3e99f2c38e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java @@ -26,7 +26,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; import java.util.Base64; import java.util.Collection; +import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; @@ -37,6 +39,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.MemoryUnit; @@ -108,6 +111,36 @@ public class CreateInitialSplitsIT extends AccumuloClusterHarness { runTest(generateNonBinarySplits(3000, 32)); } + @Test + public void testCreateInitialSplitsWithMergeability() throws TableExistsException, + AccumuloSecurityException, AccumuloException, TableNotFoundException { + tableName = getUniqueNames(1)[0]; + + SortedMap<Text,TabletMergeability> splits = new TreeMap<>(); + var splitDuration = Duration.ofSeconds(10); + for (int i = 0; i < 10; i++) { + splits.put(encode(getRandomText(32), false), TabletMergeability.after(splitDuration)); + } + + NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits); + assertFalse(client.tableOperations().exists(tableName)); + client.tableOperations().create(tableName, ntc); + assertTrue(client.tableOperations().exists(tableName)); + Collection<Text> createdSplits = client.tableOperations().listSplits(tableName); + assertEquals(splits.keySet(), new TreeSet<>(createdSplits)); + + var tableId = getServerContext().getTableId(tableName); + try (var tablets = getServerContext().getAmple().readTablets().forTable(tableId).build()) { + // default tablet (null end row) should have a default TabletMergeability of never for user + // created tablets + assertTrue(tablets.stream() + .anyMatch(tm -> tm.getEndRow() == null && tm.getTabletMergeability().isNever())); + // other splits should be created with a duration of 10 seconds + assertEquals(10, tablets.stream().filter(tm -> tm.getTabletMergeability().getDelay() + .map(delay -> delay.equals(splitDuration)).orElse(false)).count()); + } + } + @Test public void testCreateInitialSplitsWithEncodedSplits() throws TableExistsException, AccumuloSecurityException, AccumuloException, TableNotFoundException {