This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 490a4634f6 Move the tracking of unsplittable tablets to metadata table (#4317) 490a4634f6 is described below commit 490a4634f6a299b26287bda7ae596572b2ee0808 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Fri Mar 8 17:04:12 2024 -0500 Move the tracking of unsplittable tablets to metadata table (#4317) This adds a new column to store information for tracking unsplittable tablets in the metadata table instead of in memory. This information can be used by the tablet management iterator to know if a tablet needs to split and avoid unnecessarily trying to split a tablet that can't be split. The data stored includes a hash of the file set and the relevant config related to splits and if this changes then the iterator will try and split again and retest. Co-authored-by: Keith Turner <ktur...@apache.org> --- .../accumulo/core/metadata/schema/Ample.java | 4 + .../core/metadata/schema/MetadataSchema.java | 13 ++ .../core/metadata/schema/TabletMetadata.java | 36 ++++- .../metadata/schema/TabletMetadataBuilder.java | 13 ++ .../core/metadata/schema/TabletMutatorBase.java | 13 ++ .../core/metadata/schema/TabletsMetadata.java | 4 + .../core/metadata/schema/UnSplittableMetadata.java | 116 ++++++++++++++ .../java/org/apache/accumulo/core/util/Merge.java | 7 +- .../core/metadata/schema/TabletMetadataTest.java | 110 ++++++++++++- .../server/constraints/MetadataConstraints.java | 20 ++- .../manager/state/TabletManagementIterator.java | 37 +++-- .../apache/accumulo/server}/split/SplitUtils.java | 46 +++++- .../constraints/MetadataConstraintsTest.java | 60 ++++++- .../accumulo/server}/split/SplitUtilsTest.java | 2 +- .../manager/tableOps/split/FindSplits.java | 82 +++++++++- .../manager/tableOps/split/UpdateTablets.java | 6 + .../manager/tableOps/split/UpdateTabletsTest.java | 7 +- .../org/apache/accumulo/test/LargeSplitRowIT.java | 177 ++++++++++++++++++++- 18 files changed, 708 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index d3f27a7f36..d85553f75e 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -419,6 +419,10 @@ public interface Ample { T putUserCompactionRequested(FateId fateId); T deleteUserCompactionRequested(FateId fateId); + + T setUnSplittable(UnSplittableMetadata unSplittableMeta); + + T deleteUnSplittable(); } interface TabletMutator extends TabletUpdates<TabletMutator> { diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 46cbfb7b06..b093a4bccf 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -430,6 +430,19 @@ public class MetadataSchema { public static final Text NAME = new Text(STR_NAME); } + /** + * This family is used to track information needed for splits. Currently, the only thing stored + * is if the tablets are un-splittable based on the files the tablet and configuration related + * to splits. + */ + public static class SplitColumnFamily { + public static final String STR_NAME = "split"; + public static final Text NAME = new Text(STR_NAME); + public static final String UNSPLITTABLE_QUAL = "unsplittable"; + public static final ColumnFQ UNSPLITTABLE_COLUMN = + new ColumnFQ(NAME, new Text(UNSPLITTABLE_QUAL)); + } + // TODO when removing the Upgrader12to13 class in the upgrade package, also remove this class. public static class Upgrade12to13 { diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index ccdb2acaec..be7d2fb056 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -70,6 +70,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; @@ -82,6 +83,8 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -120,6 +123,8 @@ public class TabletMetadata { private boolean futureAndCurrentLocationSet = false; private Set<FateId> compacted; private Set<FateId> userCompactionsRequested; + private UnSplittableMetadata unSplittableMetadata; + private Supplier<Long> fileSize; public static TabletMetadataBuilder builder(KeyExtent extent) { return new TabletMetadataBuilder(extent); @@ -150,7 +155,8 @@ public class TabletMetadata { OPID, SELECTED, COMPACTED, - USER_COMPACTION_REQUESTED + USER_COMPACTION_REQUESTED, + UNSPLITTABLE } public static class Location { @@ -316,6 +322,14 @@ public class TabletMetadata { return files; } + /** + * @return the sum of the tablets files sizes + */ + public long getFileSize() { + ensureFetched(ColumnType.FILES); + return fileSize.get(); + } + public SelectedFiles getSelectedFiles() { ensureFetched(ColumnType.SELECTED); return selectedFiles; @@ -381,6 +395,11 @@ public class TabletMetadata { return onDemandHostingRequested; } + public UnSplittableMetadata getUnSplittable() { + ensureFetched(ColumnType.UNSPLITTABLE); + return unSplittableMetadata; + } + @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("tableId", tableId) @@ -394,7 +413,8 @@ public class TabletMetadata { .append("onDemandHostingRequested", onDemandHostingRequested) .append("operationId", operationId).append("selectedFiles", selectedFiles) .append("futureAndCurrentLocationSet", futureAndCurrentLocationSet) - .append("userCompactionsRequested", userCompactionsRequested).toString(); + .append("userCompactionsRequested", userCompactionsRequested) + .append("unSplittableMetadata", unSplittableMetadata).toString(); } public SortedMap<Key,Value> getKeyValues() { @@ -545,6 +565,13 @@ public class TabletMetadata { case UserCompactionRequestedColumnFamily.STR_NAME: userCompactionsRequestedBuilder.add(FateId.from(qual)); break; + case SplitColumnFamily.STR_NAME: + if (qual.equals(SplitColumnFamily.UNSPLITTABLE_QUAL)) { + te.unSplittableMetadata = UnSplittableMetadata.toUnSplittable(val); + } else { + throw new IllegalStateException("Unexpected SplitColumnFamily qualifier: " + qual); + } + break; default: throw new IllegalStateException("Unexpected family " + fam); @@ -557,7 +584,10 @@ public class TabletMetadata { te.availability = TabletAvailability.HOSTED; } - te.files = filesBuilder.build(); + var files = filesBuilder.build(); + te.files = files; + te.fileSize = + Suppliers.memoize(() -> files.values().stream().mapToLong(DataFileValue::getSize).sum()); te.loadedFiles = loadedFilesBuilder.build(); te.fetchedCols = fetchedColumns; te.scans = scansBuilder.build(); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java index 55a2107599..fb79fc8066 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java @@ -36,6 +36,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.UNSPLITTABLE; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import java.util.Arrays; @@ -295,6 +296,18 @@ public class TabletMetadataBuilder implements Ample.TabletUpdates<TabletMetadata throw new UnsupportedOperationException(); } + @Override + public TabletMetadataBuilder setUnSplittable(UnSplittableMetadata unSplittableMeta) { + fetched.add(UNSPLITTABLE); + internalBuilder.setUnSplittable(unSplittableMeta); + return this; + } + + @Override + public TabletMetadataBuilder deleteUnSplittable() { + throw new UnsupportedOperationException(); + } + /** * @param extraFetched Anything that was put on the builder will automatically be added to the * fetched set. However, for the case where something was not put and it needs to be diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java index 190439ab2d..9cc8143dc3 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.La import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; @@ -347,6 +348,18 @@ public abstract class TabletMutatorBase<T extends Ample.TabletUpdates<T>> return getThis(); } + @Override + public T setUnSplittable(UnSplittableMetadata unSplittableMeta) { + SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unSplittableMeta.toBase64())); + return getThis(); + } + + @Override + public T deleteUnSplittable() { + SplitColumnFamily.UNSPLITTABLE_COLUMN.putDelete(mutation); + return getThis(); + } + public void setCloseAfterMutate(AutoCloseable closeable) { this.closeAfterMutate = closeable; } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java index 4fee556643..9e61d9103c 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java @@ -81,6 +81,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.La import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -396,6 +397,9 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable case USER_COMPACTION_REQUESTED: families.add(UserCompactionRequestedColumnFamily.NAME); break; + case UNSPLITTABLE: + qualifiers.add(SplitColumnFamily.UNSPLITTABLE_COLUMN); + break; default: throw new IllegalArgumentException("Unknown col type " + colToFetch); } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/UnSplittableMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/UnSplittableMetadata.java new file mode 100644 index 0000000000..5c53c82952 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/UnSplittableMetadata.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.metadata.schema; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Base64; +import java.util.Objects; +import java.util.Set; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.StoredTabletFile; + +import com.google.common.base.Preconditions; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; + +public class UnSplittableMetadata { + + private final HashCode hashOfSplitParameters; + + private UnSplittableMetadata(KeyExtent keyExtent, long splitThreshold, long maxEndRowSize, + int maxFilesToOpen, Set<StoredTabletFile> files) { + this(calculateSplitParamsHash(keyExtent, splitThreshold, maxEndRowSize, maxFilesToOpen, files)); + } + + private UnSplittableMetadata(HashCode hashOfSplitParameters) { + this.hashOfSplitParameters = Objects.requireNonNull(hashOfSplitParameters); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UnSplittableMetadata that = (UnSplittableMetadata) o; + return Objects.equals(hashOfSplitParameters, that.hashOfSplitParameters); + } + + @Override + public int hashCode() { + return Objects.hash(hashOfSplitParameters); + } + + @Override + public String toString() { + return toBase64(); + } + + public String toBase64() { + return Base64.getEncoder().encodeToString(hashOfSplitParameters.asBytes()); + } + + @SuppressWarnings("UnstableApiUsage") + private static HashCode calculateSplitParamsHash(KeyExtent keyExtent, long splitThreshold, + long maxEndRowSize, int maxFilesToOpen, Set<StoredTabletFile> files) { + Preconditions.checkArgument(splitThreshold > 0, "splitThreshold must be greater than 0"); + Preconditions.checkArgument(maxEndRowSize > 0, "maxEndRowSize must be greater than 0"); + Preconditions.checkArgument(maxFilesToOpen > 0, "maxFilesToOpen must be greater than 0"); + + // Use static call to murmur3_128() so the seed is always the same + // Hashing.goodFastHash will seed with the current time, and we need the seed to be + // the same across restarts and instances + var hasher = Hashing.murmur3_128().newHasher(); + hasher.putBytes(serializeKeyExtent(keyExtent)).putLong(splitThreshold).putLong(maxEndRowSize) + .putInt(maxFilesToOpen); + files.stream().map(StoredTabletFile::getMetadata).sorted() + .forEach(path -> hasher.putString(path, UTF_8)); + return hasher.hash(); + } + + public static UnSplittableMetadata toUnSplittable(KeyExtent keyExtent, long splitThreshold, + long maxEndRowSize, int maxFilesToOpen, Set<StoredTabletFile> files) { + return new UnSplittableMetadata(keyExtent, splitThreshold, maxEndRowSize, maxFilesToOpen, + files); + } + + public static UnSplittableMetadata toUnSplittable(String base64HashOfSplitParameters) { + return new UnSplittableMetadata( + HashCode.fromBytes(Base64.getDecoder().decode(base64HashOfSplitParameters))); + } + + private static byte[] serializeKeyExtent(KeyExtent keyExtent) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + keyExtent.writeTo(dos); + dos.close(); + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/util/Merge.java b/core/src/main/java/org/apache/accumulo/core/util/Merge.java index 4eba179640..c872086af7 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/Merge.java +++ b/core/src/main/java/org/apache/accumulo/core/util/Merge.java @@ -37,7 +37,6 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.AccumuloTable; -import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.hadoop.io.Text; @@ -162,10 +161,8 @@ public class Merge { .overRange(new KeyExtent(tableId, end, start).toMetaRange()).fetch(FILES, PREV_ROW) .build()) { - Iterator<Size> sizeIterator = tablets.stream().map(tm -> { - long size = tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum(); - return new Size(tm.getExtent(), size); - }).iterator(); + Iterator<Size> sizeIterator = + tablets.stream().map(tm -> new Size(tm.getExtent(), tm.getFileSize())).iterator(); while (sizeIterator.hasNext()) { Size next = sizeIterator.next(); diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index 8131a1a0df..e2fc8b8167 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -32,6 +32,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.UNSPLITTABLE; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -52,6 +53,7 @@ import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -69,6 +71,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Da import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; @@ -131,6 +134,9 @@ public class TabletMetadataTest { MERGED_COLUMN.put(mutation, new Value()); mutation.put(UserCompactionRequestedColumnFamily.STR_NAME, FateId.from(type, 17).canonical(), ""); + var unsplittableMeta = + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2)); + SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta.toBase64())); SortedMap<Key,Value> rowMap = toRowMap(mutation); @@ -143,6 +149,8 @@ public class TabletMetadataTest { assertEquals(extent, tm.getExtent()); assertEquals(Set.of(tf1, tf2), Set.copyOf(tm.getFiles())); assertEquals(Map.of(tf1, dfv1, tf2, dfv2), tm.getFilesMap()); + assertEquals(tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum(), + tm.getFileSize()); assertEquals(6L, tm.getFlushId().getAsLong()); assertEquals(rowMap, tm.getKeyValues()); assertEquals(Map.of(new StoredTabletFile(bf1), fateId56L, new StoredTabletFile(bf2), fateId59L), @@ -162,6 +170,7 @@ public class TabletMetadataTest { assertEquals(Set.of(sf1, sf2), Set.copyOf(tm.getScans())); assertTrue(tm.hasMerged()); assertTrue(tm.getUserCompactionsRequested().contains(FateId.from(type, 17))); + assertEquals(unsplittableMeta, tm.getUnSplittable()); } @Test @@ -339,7 +348,92 @@ public class TabletMetadataTest { } @Test - public void testUnkownColFamily() { + public void testUnsplittableColumn() { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + + StoredTabletFile sf1 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")); + StoredTabletFile sf2 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf2.rf")); + StoredTabletFile sf3 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf3.rf")); + // Same path as sf4 but with a range + StoredTabletFile sf4 = + StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf3.rf"), new Range("a", "b")); + + // Test with files + var unsplittableMeta1 = + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2, sf3)); + Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); + SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta1.toBase64())); + TabletMetadata tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(UNSPLITTABLE), true, false); + assertUnsplittable(unsplittableMeta1, tm.getUnSplittable(), true); + + // Test empty file set + var unsplittableMeta2 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of()); + mutation = TabletColumnFamily.createPrevRowMutation(extent); + SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta2.toBase64())); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(UNSPLITTABLE), true, false); + assertUnsplittable(unsplittableMeta2, tm.getUnSplittable(), true); + + // Make sure not equals works as well + assertUnsplittable(unsplittableMeta1, unsplittableMeta2, false); + + // Test with ranges + // use sf4 which includes sf4 instead of sf3 which has a range + var unsplittableMeta3 = + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2, sf4)); + mutation = TabletColumnFamily.createPrevRowMutation(extent); + SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta3.toBase64())); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(UNSPLITTABLE), true, false); + assertUnsplittable(unsplittableMeta3, tm.getUnSplittable(), true); + + // make sure not equals when all the file paths are equal but one has a range + assertUnsplittable(unsplittableMeta1, unsplittableMeta3, false); + + // Column not set + mutation = TabletColumnFamily.createPrevRowMutation(extent); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(UNSPLITTABLE), true, false); + assertNull(tm.getUnSplittable()); + + // Column not fetched + mutation = TabletColumnFamily.createPrevRowMutation(extent); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(ColumnType.PREV_ROW), true, false); + assertThrows(IllegalStateException.class, tm::getUnSplittable); + } + + @Test + public void testUnsplittableWithRange() { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + + // Files with same path and different ranges + StoredTabletFile sf1 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")); + StoredTabletFile sf2 = + StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf"), new Range("a", "b")); + StoredTabletFile sf3 = + StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf"), new Range("a", "d")); + + var meta1 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1)); + var meta2 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf2)); + var meta3 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf3)); + + // compare each against the others to make sure not equal + assertUnsplittable(meta1, meta2, false); + assertUnsplittable(meta1, meta3, false); + assertUnsplittable(meta2, meta3, false); + } + + private void assertUnsplittable(UnSplittableMetadata meta1, UnSplittableMetadata meta2, + boolean equal) { + assertEquals(equal, meta1.equals(meta2)); + assertEquals(equal, meta1.hashCode() == meta2.hashCode()); + assertEquals(equal, meta1.toBase64().equals(meta2.toBase64())); + } + + @Test + public void testUnknownColFamily() { KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); @@ -390,13 +484,15 @@ public class TabletMetadataTest { .putFile(sf1, dfv1).putFile(sf2, dfv2).putBulkFile(rf1, FateId.from(type, 25)) .putBulkFile(rf2, FateId.from(type, 35)).putFlushId(27).putDirName("dir1").putScan(sf3) .putScan(sf4).putCompacted(FateId.from(type, 17)).putCompacted(FateId.from(type, 23)) - .build(ECOMP, HOSTING_REQUESTED, MERGED, USER_COMPACTION_REQUESTED); + .build(ECOMP, HOSTING_REQUESTED, MERGED, USER_COMPACTION_REQUESTED, UNSPLITTABLE); assertEquals(extent, tm.getExtent()); assertEquals(TabletAvailability.UNHOSTED, tm.getTabletAvailability()); assertEquals(Location.future(ser1), tm.getLocation()); assertEquals(27L, tm.getFlushId().orElse(-1)); assertEquals(Map.of(sf1, dfv1, sf2, dfv2), tm.getFilesMap()); + assertEquals(tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum(), + tm.getFileSize()); assertEquals(Map.of(rf1.insert(), FateId.from(type, 25L), rf2.insert(), FateId.from(type, 35L)), tm.getLoaded()); assertEquals("dir1", tm.getDirName()); @@ -406,6 +502,7 @@ public class TabletMetadataTest { assertFalse(tm.getHostingRequested()); assertTrue(tm.getUserCompactionsRequested().isEmpty()); assertFalse(tm.hasMerged()); + assertNull(tm.getUnSplittable()); assertThrows(IllegalStateException.class, tm::getOperationId); assertThrows(IllegalStateException.class, tm::getSuspend); assertThrows(IllegalStateException.class, tm::getTime); @@ -429,6 +526,9 @@ public class TabletMetadataTest { assertThrows(IllegalStateException.class, tm2::getHostingRequested); assertThrows(IllegalStateException.class, tm2::getSelectedFiles); assertThrows(IllegalStateException.class, tm2::getCompacted); + assertThrows(IllegalStateException.class, tm2::hasMerged); + assertThrows(IllegalStateException.class, tm2::getUserCompactionsRequested); + assertThrows(IllegalStateException.class, tm2::getUnSplittable); var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); CompactionMetadata ecm = new CompactionMetadata(Set.of(sf1, sf2), rf1, "cid1", @@ -438,11 +538,14 @@ public class TabletMetadataTest { LogEntry le2 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID()); SelectedFiles selFiles = new SelectedFiles(Set.of(sf1, sf4), false, FateId.from(type, 159L)); + var unsplittableMeta = + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2)); TabletMetadata tm3 = TabletMetadata.builder(extent).putExternalCompaction(ecid1, ecm) .putSuspension(ser1, 45L).putTime(new MetadataTime(479, TimeType.LOGICAL)).putWal(le1) .putWal(le2).setHostingRequested().putSelectedFiles(selFiles).setMerged() - .putUserCompactionRequested(FateId.from(type, 159L)).build(); + .putUserCompactionRequested(FateId.from(type, 159L)).setUnSplittable(unsplittableMeta) + .build(); assertEquals(Set.of(ecid1), tm3.getExternalCompactions().keySet()); assertEquals(Set.of(sf1, sf2), tm3.getExternalCompactions().get(ecid1).getJobFiles()); @@ -458,6 +561,7 @@ public class TabletMetadataTest { assertEquals(selFiles.getMetadataValue(), tm3.getSelectedFiles().getMetadataValue()); assertTrue(tm3.hasMerged()); assertTrue(tm3.getUserCompactionsRequested().contains(FateId.from(type, 159L))); + assertEquals(unsplittableMeta, tm3.getUnSplittable()); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index abb1c7a27d..c6a30040a1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -52,12 +52,14 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Upgrade12to13; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.cleaner.CleanerUtil; import org.apache.accumulo.server.ServerContext; @@ -95,6 +97,7 @@ public class MetadataConstraints implements Constraint { TabletColumnFamily.AVAILABILITY_COLUMN, TabletColumnFamily.REQUESTED_COLUMN, ServerColumnFamily.SELECTED_COLUMN, + SplitColumnFamily.UNSPLITTABLE_COLUMN, Upgrade12to13.COMPACT_COL); @SuppressWarnings("deprecation") @@ -270,11 +273,20 @@ public class MetadataConstraints implements Constraint { } catch (RuntimeException e) { violations = addViolation(violations, 11); } - } else if (CompactedColumnFamily.NAME.equals(columnFamily) - || UserCompactionRequestedColumnFamily.NAME.equals(columnFamily)) { + } else if (CompactedColumnFamily.NAME.equals(columnFamily)) { if (!FateId.isFateId(columnQualifier.toString())) { violations = addViolation(violations, 13); } + } else if (UserCompactionRequestedColumnFamily.NAME.equals(columnFamily)) { + if (!FateId.isFateId(columnQualifier.toString())) { + violations = addViolation(violations, 14); + } + } else if (SplitColumnFamily.UNSPLITTABLE_COLUMN.equals(columnFamily, columnQualifier)) { + try { + UnSplittableMetadata.toUnSplittable(new String(columnUpdate.getValue(), UTF_8)); + } catch (RuntimeException e) { + violations = addViolation(violations, 15); + } } else if (columnFamily.equals(BulkFileColumnFamily.NAME)) { if (!columnUpdate.isDeleted() && !checkedBulk) { /* @@ -435,6 +447,10 @@ public class MetadataConstraints implements Constraint { return "Invalid data file metadata format"; case 13: return "Invalid compacted column"; + case 14: + return "Invalid user compaction requested column"; + case 15: + return "Invalid unsplittable column"; } return null; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index df37df441c..4704f691c8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.SortedMap; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.PluginEnvironment.Configuration; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; @@ -44,9 +45,9 @@ import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.TabletState; -import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer; import org.apache.accumulo.core.spi.balancer.TabletBalancer; import org.apache.accumulo.core.spi.compaction.CompactionKind; @@ -54,6 +55,7 @@ import org.apache.accumulo.server.compaction.CompactionJobGenerator; import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; +import org.apache.accumulo.server.split.SplitUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,12 +71,28 @@ public class TabletManagementIterator extends SkippingIterator { private TabletBalancer balancer; private static boolean shouldReturnDueToSplit(final TabletMetadata tm, - final long splitThreshold) { - final long sumOfFileSizes = - tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum(); - final boolean shouldSplit = sumOfFileSizes > splitThreshold; - LOG.trace("{} should split? sum: {}, threshold: {}, result: {}", tm.getExtent(), sumOfFileSizes, - splitThreshold, shouldSplit); + final Configuration tableConfig) { + + final long splitThreshold = ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_SPLIT_THRESHOLD.getKey())); + final long maxEndRowSize = ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConfig.get(Property.TABLE_MAX_END_ROW_SIZE.getKey())); + final int maxFilesToOpen = (int) ConfigurationTypeHelper.getFixedMemoryAsBytes( + tableConfig.get(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN.getKey())); + + // If the current computed metadata matches the current marker then we can't split, + // so we return false. If the marker is set but doesn't match then return true + // which gives a chance to clean up the marker and recheck. + var unsplittable = tm.getUnSplittable(); + if (unsplittable != null) { + return !unsplittable.equals(UnSplittableMetadata.toUnSplittable(tm.getExtent(), + splitThreshold, maxEndRowSize, maxFilesToOpen, tm.getFiles())); + } + + // If unsplittable is not set at all then check if over split threshold + final boolean shouldSplit = SplitUtils.needsSplit(tableConfig, tm); + LOG.trace("{} should split? sum: {}, threshold: {}, result: {}", tm.getExtent(), + tm.getFileSize(), splitThreshold, shouldSplit); return shouldSplit; } @@ -255,10 +273,7 @@ public class TabletManagementIterator extends SkippingIterator { if (tm.getOperationId() == null && Collections.disjoint(REASONS_NOT_TO_SPLIT_OR_COMPACT, reasonsToReturnThisTablet)) { try { - final long splitThreshold = - ConfigurationTypeHelper.getFixedMemoryAsBytes(this.env.getPluginEnv() - .getConfiguration(tm.getTableId()).get(Property.TABLE_SPLIT_THRESHOLD.getKey())); - if (shouldReturnDueToSplit(tm, splitThreshold)) { + if (shouldReturnDueToSplit(tm, this.env.getPluginEnv().getConfiguration(tm.getTableId()))) { reasonsToReturnThisTablet.add(ManagementAction.NEEDS_SPLITTING); } // important to call this since reasonsToReturnThisTablet is passed to it diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java b/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java similarity index 82% rename from server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java rename to server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java index bce10badb2..fcf4e2422f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.manager.split; +package org.apache.accumulo.server.split; import java.io.IOException; import java.io.UncheckedIOException; @@ -29,6 +29,8 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.function.Predicate; +import org.apache.accumulo.core.client.PluginEnvironment.Configuration; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; @@ -39,8 +41,8 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TabletFile; -import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.hadoop.fs.FileSystem; @@ -197,8 +199,6 @@ public class SplitUtils { } public static SortedSet<Text> findSplits(ServerContext context, TabletMetadata tabletMetadata) { - var estimatedSize = - tabletMetadata.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum(); var tableConf = context.getTableConfiguration(tabletMetadata.getTableId()); var threshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); var maxEndRowSize = tableConf.getAsBytes(Property.TABLE_MAX_END_ROW_SIZE); @@ -207,7 +207,8 @@ public class SplitUtils { // anymore. int maxFilesToOpen = tableConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN); - if (estimatedSize <= threshold) { + var estimatedSize = tabletMetadata.getFileSize(); + if (!needsSplit(context, tabletMetadata)) { return new TreeSet<>(); } @@ -295,4 +296,39 @@ public class SplitUtils { return splits; } + + public static boolean needsSplit(ServerContext context, TabletMetadata tabletMetadata) { + var tableConf = context.getTableConfiguration(tabletMetadata.getTableId()); + var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); + return needsSplit(splitThreshold, tabletMetadata); + } + + public static boolean needsSplit(final Configuration tableConf, TabletMetadata tabletMetadata) { + var splitThreshold = ConfigurationTypeHelper + .getFixedMemoryAsBytes(tableConf.get(Property.TABLE_SPLIT_THRESHOLD.getKey())); + return needsSplit(splitThreshold, tabletMetadata); + } + + public static boolean needsSplit(long splitThreshold, TabletMetadata tabletMetadata) { + return tabletMetadata.getFileSize() > splitThreshold; + } + + public static UnSplittableMetadata toUnSplittable(ServerContext context, + TabletMetadata tabletMetadata) { + var tableConf = context.getTableConfiguration(tabletMetadata.getTableId()); + var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); + var maxEndRowSize = tableConf.getAsBytes(Property.TABLE_MAX_END_ROW_SIZE); + int maxFilesToOpen = tableConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN); + + var unSplittableMetadata = UnSplittableMetadata.toUnSplittable(tabletMetadata.getExtent(), + splitThreshold, maxEndRowSize, maxFilesToOpen, tabletMetadata.getFiles()); + + log.trace( + "Created unsplittable metadata for tablet {}. splitThreshold: {}, maxEndRowSize:{}, maxFilesToOpen: {}, hashCode: {}", + tabletMetadata.getExtent(), splitThreshold, maxEndRowSize, maxFilesToOpen, + unSplittableMetadata); + + return unSplittableMetadata; + } + } diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java index 89e22a09de..5f3132acd3 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java @@ -19,8 +19,10 @@ package org.apache.accumulo.server.constraints; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.lang.reflect.Method; import java.util.Base64; @@ -31,6 +33,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.metadata.AccumuloTable; @@ -43,9 +46,11 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Cu import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.server.ServerContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -493,17 +498,17 @@ public class MetadataConstraintsTest { @Test public void testCompacted() { - testFateCqValidation(CompactedColumnFamily.STR_NAME); + testFateCqValidation(CompactedColumnFamily.STR_NAME, (short) 13); } @Test public void testUserCompactionRequested() { - testFateCqValidation(UserCompactionRequestedColumnFamily.STR_NAME); + testFateCqValidation(UserCompactionRequestedColumnFamily.STR_NAME, (short) 14); } // Verify that columns that store a FateId in their CQ // validate and only allow a correctly formatted FateId - private void testFateCqValidation(String column) { + private void testFateCqValidation(String column, short violation) { MetadataConstraints mc = new MetadataConstraints(); Mutation m; List<Short> violations; @@ -519,7 +524,54 @@ public class MetadataConstraintsTest { violations = mc.check(createEnv(), m); assertNotNull(violations); assertEquals(1, violations.size()); - assertEquals(Short.valueOf((short) 13), violations.get(0)); + assertEquals(violation, violations.get(0)); + } + + @Test + public void testUnsplittableColumn() { + MetadataConstraints mc = new MetadataConstraints(); + Mutation m; + List<Short> violations; + + StoredTabletFile sf1 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")); + var unsplittableMeta = UnSplittableMetadata + .toUnSplittable(KeyExtent.fromMetaRow(new Text("0;foo")), 100, 110, 120, Set.of(sf1)); + + m = new Mutation(new Text("0;foo")); + SplitColumnFamily.UNSPLITTABLE_COLUMN.put(m, new Value(unsplittableMeta.toBase64())); + violations = mc.check(createEnv(), m); + assertNull(violations); + + // Verify empty value not allowed + m = new Mutation(new Text("0;foo")); + SplitColumnFamily.UNSPLITTABLE_COLUMN.put(m, new Value()); + violations = mc.check(createEnv(), m); + assertNotNull(violations); + assertEquals(2, violations.size()); + assertIterableEquals(List.of((short) 6, (short) 15), violations); + + // test invalid args + KeyExtent extent = KeyExtent.fromMetaRow(new Text("0;foo")); + assertThrows(IllegalArgumentException.class, + () -> UnSplittableMetadata.toUnSplittable(extent, -100, 110, 120, Set.of(sf1))); + assertThrows(IllegalArgumentException.class, + () -> UnSplittableMetadata.toUnSplittable(extent, 100, -110, 120, Set.of(sf1))); + assertThrows(IllegalArgumentException.class, + () -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, -120, Set.of(sf1))); + assertThrows(NullPointerException.class, + () -> UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, null)); + + // Test metadata constraints validate invalid hashcode + m = new Mutation(new Text("0;foo")); + unsplittableMeta = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1)); + // partial hashcode is invalid + var invalidHashCode = + unsplittableMeta.toBase64().substring(0, unsplittableMeta.toBase64().length() - 1); + SplitColumnFamily.UNSPLITTABLE_COLUMN.put(m, new Value(invalidHashCode)); + violations = mc.check(createEnv(), m); + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 15), violations.get(0)); } // Encode a row how it would appear in Json diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/split/SplitUtilsTest.java similarity index 99% rename from server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java rename to server/base/src/test/java/org/apache/accumulo/server/split/SplitUtilsTest.java index 80258e4914..e8281e4c08 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/split/SplitUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/split/SplitUtilsTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.manager.split; +package org.apache.accumulo.server.split; import static java.util.stream.Collectors.toCollection; import static java.util.stream.Collectors.toList; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java index 695f5650cc..7e3acd8d7f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java @@ -18,15 +18,22 @@ */ package org.apache.accumulo.manager.tableOps.split; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; + +import java.util.Optional; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.Consumer; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.split.SplitUtils; import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.server.split.SplitUtils; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +52,9 @@ public class FindSplits extends ManagerRepo { @Override public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { var extent = splitInfo.getOriginal(); - var tabletMetadata = manager.getContext().getAmple().readTablet(extent); + var ample = manager.getContext().getAmple(); + var tabletMetadata = ample.readTablet(extent); + Optional<UnSplittableMetadata> computedUnsplittable = Optional.empty(); if (tabletMetadata == null) { log.trace("Table {} no longer exist, so not gonna try to find a split point for it", extent); @@ -58,12 +67,25 @@ public class FindSplits extends ManagerRepo { return null; } + // The TabletManagementIterator should normally not be trying to split if the tablet was marked + // as unsplittable and the metadata hasn't changed so check that the metadata is different + if (tabletMetadata.getUnSplittable() != null) { + computedUnsplittable = + Optional.of(SplitUtils.toUnSplittable(manager.getContext(), tabletMetadata)); + if (tabletMetadata.getUnSplittable().equals(computedUnsplittable.orElseThrow())) { + log.debug("Not splitting {} because unsplittable metadata is present and did not change", + extent); + return null; + } + } + if (!tabletMetadata.getLogs().isEmpty()) { // This code is only called by system initiated splits, so if walogs are present it probably // makes sense to wait for the data in them to be written to a file before finding splits // points. log.debug("Not splitting {} because it has walogs {}", tabletMetadata.getExtent(), tabletMetadata.getLogs().size()); + return null; } SortedSet<Text> splits = SplitUtils.findSplits(manager.getContext(), tabletMetadata); @@ -73,15 +95,59 @@ public class FindSplits extends ManagerRepo { } if (splits.isEmpty()) { - log.info("Tablet {} needs to split, but no split points could be found.", - tabletMetadata.getExtent()); - // ELASTICITY_TODO record the fact that tablet is un-splittable in metadata table in a new - // column. Record the config used to reach this decision and a hash of the file. The tablet - // mgmt iterator can inspect this column and only try to split the tablet when something has - // changed. + Consumer<ConditionalResult> resultConsumer = result -> { + if (result.getStatus() == Status.REJECTED) { + log.debug("{} unsplittable metadata update for {} was rejected ", fateId, + result.getExtent()); + } + }; + + try (var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { + // No split points were found, so we need to check if the tablet still + // needs to be split but is unsplittable, or if a split is not needed + + // Case 1: If a split is needed then set the unsplittable marker as no split + // points could be found so that we don't keep trying again until the + // split metadata is changed + if (SplitUtils.needsSplit(manager.getContext(), tabletMetadata)) { + log.info("Tablet {} needs to split, but no split points could be found.", + tabletMetadata.getExtent()); + var unSplittableMeta = computedUnsplittable + .orElseGet(() -> SplitUtils.toUnSplittable(manager.getContext(), tabletMetadata)); + + // With the current design we don't need to require the files to be the same + // for correctness as the TabletManagementIterator will detect the difference + // when computing the hash and retry a new split operation if there is not a match. + // But if we already know there's a change now, it would be more efficient to fail and + // retry the current fate op vs completing and having the iterator submit a new one. + log.debug("Setting unsplittable metadata on tablet {}. hashCode: {}", + tabletMetadata.getExtent(), unSplittableMeta); + var mutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requireSame(tabletMetadata, FILES).setUnSplittable(unSplittableMeta); + mutator.submit(tm -> unSplittableMeta.equals(tm.getUnSplittable())); + + // Case 2: If the unsplittable marker has already been previously set, but we do not need + // to split then clear the marker. This could happen in some scenarios such as + // a compaction that shrinks a previously unsplittable tablet below the threshold + // or if the threshold has been raised higher because the tablet management iterator + // will try and split any time the computed metadata changes. + } else if (tabletMetadata.getUnSplittable() != null) { + log.info("Tablet {} no longer needs to split, deleting unsplittable marker.", + tabletMetadata.getExtent()); + var mutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requireSame(tabletMetadata, FILES).deleteUnSplittable(); + mutator.submit(tm -> tm.getUnSplittable() == null); + // Case 3: The table config and/or set of files changed since the tablet mgmt iterator + // examined this tablet. + } else { + log.debug("Tablet {} no longer needs to split, ignoring it.", tabletMetadata.getExtent()); + } + } + return null; } return new PreSplit(extent, splits); } + } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index f2d1501ae5..808ebf8b3a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -282,6 +282,12 @@ public class UpdateTablets extends ManagerRepo { mutator.deleteLocation(tabletMetadata.getLast()); } + // Clean up any previous unsplittable marker + if (tabletMetadata.getUnSplittable() != null) { + mutator.deleteUnSplittable(); + log.debug("{} deleting unsplittable metadata from {} because of split", fateId, newExtent); + } + mutator.submit(tm -> false); var result = tabletsMutator.process().get(splitInfo.getOriginal()); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index baccd84680..226bc53034 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -52,6 +52,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.split.Splitter; @@ -88,7 +89,7 @@ public class UpdateTabletsTest { ColumnType.LOADED, ColumnType.USER_COMPACTION_REQUESTED, ColumnType.MERGED, ColumnType.LAST, ColumnType.SCANS, ColumnType.DIR, ColumnType.CLONED, ColumnType.FLUSH_ID, ColumnType.FLUSH_NONCE, ColumnType.SUSPEND, ColumnType.AVAILABILITY, - ColumnType.HOSTING_REQUESTED, ColumnType.COMPACTED); + ColumnType.HOSTING_REQUESTED, ColumnType.COMPACTED, ColumnType.UNSPLITTABLE); /** * The purpose of this test is to catch new tablet metadata columns that were added w/o @@ -261,6 +262,9 @@ public class UpdateTabletsTest { EasyMock.expect(tabletMeta.getHostingRequested()).andReturn(true).atLeastOnce(); EasyMock.expect(tabletMeta.getSuspend()).andReturn(suspendingTServer).atLeastOnce(); EasyMock.expect(tabletMeta.getLast()).andReturn(lastLocation).atLeastOnce(); + UnSplittableMetadata usm = + UnSplittableMetadata.toUnSplittable(origExtent, 1000, 1001, 1002, tabletFiles.keySet()); + EasyMock.expect(tabletMeta.getUnSplittable()).andReturn(usm).atLeastOnce(); EasyMock.expect(ample.readTablet(origExtent)).andReturn(tabletMeta); @@ -340,6 +344,7 @@ public class UpdateTabletsTest { EasyMock.expect(tablet3Mutator.deleteHostingRequested()).andReturn(tablet3Mutator); EasyMock.expect(tablet3Mutator.deleteSuspension()).andReturn(tablet3Mutator); EasyMock.expect(tablet3Mutator.deleteLocation(lastLocation)).andReturn(tablet3Mutator); + EasyMock.expect(tablet3Mutator.deleteUnSplittable()).andReturn(tablet3Mutator); tablet3Mutator.submit(EasyMock.anyObject()); EasyMock.expectLastCall().once(); EasyMock.expect(tabletsMutator.mutateTablet(origExtent)).andReturn(tablet3Mutator); diff --git a/test/src/main/java/org/apache/accumulo/test/LargeSplitRowIT.java b/test/src/main/java/org/apache/accumulo/test/LargeSplitRowIT.java index 68c21d67dd..b810707cba 100644 --- a/test/src/main/java/org/apache/accumulo/test/LargeSplitRowIT.java +++ b/test/src/main/java/org/apache/accumulo/test/LargeSplitRowIT.java @@ -44,9 +44,13 @@ import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.split.SplitUtils; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; @@ -129,7 +133,8 @@ public class LargeSplitRowIT extends ConfigurableMacBase { Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K", Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none", Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64", - Property.TABLE_MAX_END_ROW_SIZE.getKey(), "1000" + Property.TABLE_MAX_END_ROW_SIZE.getKey(), "1000", + Property.TABLE_MAJC_RATIO.getKey(), "9999" ); // @formatter:on client.tableOperations().create(tableName, new NewTableConfiguration().setProperties(props)); @@ -155,7 +160,17 @@ public class LargeSplitRowIT extends ConfigurableMacBase { // Flush the BatchWriter and table and sleep for a bit to make sure that there is enough time // for the table to split if need be. client.tableOperations().flush(tableName, new Text(), new Text("z"), true); - Thread.sleep(500L); + + // Wait for the tablet to be marked as unsplittable due to the system split running + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + Wait.waitFor(() -> getServerContext().getAmple() + .readTablet(new KeyExtent(tableId, null, null)).getUnSplittable() != null, + Wait.MAX_WAIT_MILLIS, 100); + + // Verify that the unsplittable column is read correctly + TabletMetadata tm = + getServerContext().getAmple().readTablet(new KeyExtent(tableId, null, null)); + assertEquals(tm.getUnSplittable(), SplitUtils.toUnSplittable(getServerContext(), tm)); // Make sure all the data that was put in the table is still correct int count = 0; @@ -242,6 +257,164 @@ public class LargeSplitRowIT extends ConfigurableMacBase { } } + @Test + @Timeout(60) + public void testUnsplittableColumn() throws Exception { + log.info("Unsplittable Column Test"); + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + // make a table and lower the configuration properties + // @formatter:off + var maxEndRow = 100; + Map<String,String> props = Map.of( + Property.TABLE_SPLIT_THRESHOLD.getKey(), "1K", + Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none", + Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64", + Property.TABLE_MAX_END_ROW_SIZE.getKey(), "" + maxEndRow, + Property.TABLE_MAJC_RATIO.getKey(), "9999" + ); + // @formatter:on + + final String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName, new NewTableConfiguration().setProperties(props)); + + // Create a key for a table entry that is longer than the allowed size for an + // end row and fill this key with all m's except the last spot + byte[] data = new byte[maxEndRow + 1]; + Arrays.fill(data, 0, data.length - 2, (byte) 'm'); + + final int numOfMutations = 20; + try (BatchWriter batchWriter = client.createBatchWriter(tableName)) { + // Make the last place in the key different for every entry added to the table + for (int i = 0; i < numOfMutations; i++) { + data[data.length - 1] = (byte) i; + Mutation m = new Mutation(data); + m.put("cf", "cq", "value"); + batchWriter.addMutation(m); + } + } + // Flush the BatchWriter and table + client.tableOperations().flush(tableName, null, null, true); + + // Wait for the tablets to be marked as unsplittable due to the system split running + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + Wait.waitFor(() -> getServerContext().getAmple() + .readTablet(new KeyExtent(tableId, null, null)).getUnSplittable() != null, + Wait.MAX_WAIT_MILLIS, 100); + + // Verify that the unsplittable column is read correctly + TabletMetadata tm = + getServerContext().getAmple().readTablet(new KeyExtent(tableId, null, null)); + var unsplittable = tm.getUnSplittable(); + assertEquals(unsplittable, SplitUtils.toUnSplittable(getServerContext(), tm)); + + // Make sure no splits occurred in the table + assertTrue(client.tableOperations().listSplits(tableName).isEmpty()); + + // Bump the value for max end row by 1, we should still not be able to split but this should + // trigger an update to the unsplittable metadata value + client.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(), + "101"); + + // wait for the unsplittable marker to be set to a new value due to the property change + Wait.waitFor(() -> { + var updatedUnsplittable = getServerContext().getAmple() + .readTablet(new KeyExtent(tableId, null, null)).getUnSplittable(); + return updatedUnsplittable != null && !updatedUnsplittable.equals(unsplittable); + }, Wait.MAX_WAIT_MILLIS, 100); + // recheck with the computed meta is correct after property update + tm = getServerContext().getAmple().readTablet(new KeyExtent(tableId, null, null)); + assertEquals(tm.getUnSplittable(), SplitUtils.toUnSplittable(getServerContext(), tm)); + + // Bump max end row size and verify split occurs and unsplittable column is cleaned up + client.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(), + "500"); + + // Wait for splits to occur + assertTrue(client.tableOperations().listSplits(tableName).isEmpty()); + Wait.waitFor(() -> !client.tableOperations().listSplits(tableName).isEmpty(), + Wait.MAX_WAIT_MILLIS, 100); + + // Verify all tablets have no unsplittable metadata column + Wait.waitFor(() -> { + try (var tabletsMetadata = + getServerContext().getAmple().readTablets().forTable(tableId).build()) { + return tabletsMetadata.stream() + .allMatch(tabletMetadata -> tabletMetadata.getUnSplittable() == null); + } + }, Wait.MAX_WAIT_MILLIS, 100); + } + } + + // Test the unsplittable column is cleaned up if a previously marked unsplittable tablet + // no longer needs to be split + @Test + @Timeout(60) + public void testUnsplittableCleanup() throws Exception { + log.info("Unsplittable Column Cleanup"); + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + // make a table and lower the configuration properties + // @formatter:off + Map<String,String> props = Map.of( + Property.TABLE_SPLIT_THRESHOLD.getKey(), "1K", + Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none", + Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64", + Property.TABLE_MAJC_RATIO.getKey(), "9999" + ); + // @formatter:on + + final String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName, new NewTableConfiguration().setProperties(props)); + + byte[] data = new byte[100]; + Arrays.fill(data, 0, data.length - 1, (byte) 'm'); + + // Write enough data that will cause a split. The row is not too large for a split + // but all the rows are the same so tablets won't be able to split except for + // the last tablet (null end row) + final int numOfMutations = 20; + try (BatchWriter batchWriter = client.createBatchWriter(tableName)) { + // Make the last place in the key different for every entry added to the table + for (int i = 0; i < numOfMutations; i++) { + Mutation m = new Mutation(data); + m.put("cf", "cq" + i, "value"); + batchWriter.addMutation(m); + } + } + // Flush the BatchWriter and table + client.tableOperations().flush(tableName, null, null, true); + + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + + // Wait for a tablet to be marked as unsplittable due to the system split running + // There is enough data to split more than once so at least one tablet should be marked + // as unsplittable due to the same end row for all keys after the default tablet is split + Wait.waitFor(() -> { + try (var tabletsMetadata = + getServerContext().getAmple().readTablets().forTable(tableId).build()) { + return tabletsMetadata.stream().anyMatch(tm -> tm.getUnSplittable() != null); + } + }, Wait.MAX_WAIT_MILLIS, 100); + + var splits = client.tableOperations().listSplits(tableName); + + // Bump split threshold and verify marker is cleared + client.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), + "1M"); + + // All tablets should now be cleared of the unsplittable marker, and we should have the + // same number of splits as before + Wait.waitFor(() -> { + try (var tabletsMetadata = + getServerContext().getAmple().readTablets().forTable(tableId).build()) { + return tabletsMetadata.stream().allMatch(tm -> tm.getUnSplittable() == null); + } + }, Wait.MAX_WAIT_MILLIS, 100); + + // no more splits should have happened + assertEquals(splits, client.tableOperations().listSplits(tableName)); + } + } + private void automaticSplit(AccumuloClient client, int max, int spacing) throws Exception { // make a table and lower the configuration properties // @formatter:off