This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 7701e59022 Add Mergeability column to support automatic merges (#5187) 7701e59022 is described below commit 7701e590222029a8cd501f3a94791000bf8f8122 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Fri Jan 10 13:22:23 2025 -0500 Add Mergeability column to support automatic merges (#5187) Add Mergeability column to support automatic merges This adds a new Mergeability column for marking if/when tablets are eligible to me merged by the system based on a threshold. The column stores two values, a duration which is a delay for when a tablet can be merged that is relative to the time the Manager uses, Steady time. It also stores the current Steady time value when inserted so that later we can add the delay plus the original time when inserted to see if enough time has passed. The Steady Time value will only be stored if the delay is >= 0, otherwise it will be null as it won't be used. The Steady Time value isn't technically needed for a delay of 0 because 0 means it's eligible to merge always, but it could be useful if we wanted to change the value for some reason and it would allow logging when it was stored. There are 2 possible states for the delay value: 1) null : This means a tablet will never automatically merge 2) duration >= 0 : Tablet is eligible to merge after the given delay (stored as a duration), relative to the current system Steady time. Ie. the tablet can be merged if the current manager time is later than the delay value + the steady time value when inserted. If the duration is 0 then it means it can merge always (now). This change only adds the new column itself and populates it. The default is to never merge automatically for all cases except for when the system automatically splits tablets. In that case the newly split tablets are marked as being eligible to merge always (duration of 0). Future updates will add API enhancements to allow setting/viewing the mergeability setting as well as to enable automatic merging by the system that is based on this new column value. When automatic merging is enabled, if a user wants to make a tablet eligible to be merged in the future they would do so by setting a delay that is positive. For example, to make a tablet eligible to be merged 3 days from now the user set a duration of 3 days in the API (future PR) and when the system inserts the value into metadata it will also include the current SteadyTime on creation. Later when the the current steady time passes that set delay + original stored steady time value the tablet would be eligible to be merged. To enable merging immediately they can set the duration to 0 or use the TabletMergeability.always() helper which is just a shortcut to a duration of 0. --- .../core/client/admin/TabletMergeability.java | 136 +++++++++++++++++++ .../accumulo/core/metadata/schema/Ample.java | 2 + .../core/metadata/schema/MetadataSchema.java | 4 + .../schema/TabletMergeabilityMetadata.java | 144 +++++++++++++++++++++ .../core/metadata/schema/TabletMetadata.java | 23 +++- .../metadata/schema/TabletMetadataBuilder.java | 9 ++ .../core/metadata/schema/TabletMutatorBase.java | 7 + .../core/metadata/schema/TabletsMetadata.java | 4 + .../apache/accumulo/core/util/time/SteadyTime.java | 4 +- .../core/metadata/schema/TabletMetadataTest.java | 16 ++- .../server/constraints/MetadataConstraints.java | 11 ++ .../server/init/FileSystemInitializer.java | 4 +- .../constraints/MetadataConstraintsTest.java | 49 +++++++ .../accumulo/manager/FateServiceHandler.java | 3 +- .../accumulo/manager/tableOps/TableInfo.java | 10 ++ .../manager/tableOps/create/CreateTable.java | 5 +- .../manager/tableOps/create/PopulateMetadata.java | 1 + .../manager/tableOps/merge/MergeTablets.java | 1 + .../manager/tableOps/split/FindSplits.java | 4 +- .../accumulo/manager/tableOps/split/PreSplit.java | 8 +- .../accumulo/manager/tableOps/split/SplitInfo.java | 7 +- .../manager/tableOps/split/UpdateTablets.java | 5 +- .../manager/tableOps/merge/MergeTabletsTest.java | 39 ++++-- .../manager/tableOps/split/UpdateTabletsTest.java | 33 +++-- .../apache/accumulo/test/ample/TestAmpleIT.java | 1 + .../accumulo/test/ample/metadata/TestAmple.java | 2 + .../apache/accumulo/test/fate/ManagerRepoIT.java | 2 +- .../accumulo/test/functional/AddSplitIT.java | 12 ++ .../accumulo/test/functional/MetadataIT.java | 5 +- .../apache/accumulo/test/functional/SplitIT.java | 12 ++ 30 files changed, 525 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TabletMergeability.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TabletMergeability.java new file mode 100644 index 0000000000..c9d4de31e8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TabletMergeability.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.client.admin; + +import java.io.Serializable; +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; + +import com.google.common.base.Preconditions; + +/** + * @since 4.0.0 + */ +public class TabletMergeability implements Serializable { + private static final long serialVersionUID = 1L; + + private static final TabletMergeability NEVER = new TabletMergeability(); + private static final TabletMergeability ALWAYS = new TabletMergeability(Duration.ZERO); + + private final Duration delay; + + private TabletMergeability(Duration delay) { + this.delay = Objects.requireNonNull(delay); + } + + // Edge case for NEVER + private TabletMergeability() { + this.delay = null; + } + + /** + * Determines if the configured delay signals a tablet is never eligible to be automatically + * merged. + * + * @return true if never mergeable, else false + */ + public boolean isNever() { + return this.delay == null; + } + + /** + * Determines if the configured delay signals a tablet is always eligible to be automatically + * merged now. (Has a delay of 0) + * + * @return true if always mergeable now, else false + */ + public boolean isAlways() { + return delay != null && this.delay.isZero(); + } + + /** + * Returns an Optional duration of the delay which is one of: + * + * <ul> + * <li>empty (never)</li> + * <li>0 (now)</li> + * <li>positive delay</li> + * </ul> + * + * @return the configured mergeability delay + */ + public Optional<Duration> getDelay() { + return Optional.ofNullable(delay); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + TabletMergeability that = (TabletMergeability) o; + return Objects.equals(delay, that.delay); + } + + @Override + public int hashCode() { + return Objects.hashCode(delay); + } + + @Override + public String toString() { + if (delay == null) { + return "TabletMergeability=NEVER"; + } + return "TabletMergeability=AFTER:" + delay.toMillis() + "ms"; + } + + /** + * Signifies that a tablet is never eligible to be automatically merged. + * + * @return a {@link TabletMergeability} with an empty delay signaling never merge + */ + public static TabletMergeability never() { + return NEVER; + } + + /** + * Signifies that a tablet is eligible now to be automatically merged + * + * @return a {@link TabletMergeability} with a delay of 0 signaling never merge + */ + public static TabletMergeability always() { + return ALWAYS; + } + + /** + * Creates a {@link TabletMergeability} that signals a tablet has a delay to a point in the future + * before it is automatically eligible to be merged. The duration must be positive value. + * + * @param delay the duration of the delay + * + * @return a {@link TabletMergeability} from the given delay. + */ + public static TabletMergeability after(Duration delay) { + Preconditions.checkArgument(delay.toNanos() >= 0, "Duration of delay must be greater than 0."); + return new TabletMergeability(delay); + } + +} diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 367ee6fe64..c78e5661bb 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -393,6 +393,8 @@ public interface Ample { T putCloned(); + T putTabletMergeability(TabletMergeabilityMetadata tabletMergeability); + /** * By default the server lock is automatically added to mutations unless this method is set to * false. diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 5360c98274..885fd32623 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -158,6 +158,10 @@ public class MetadataSchema { public static final String REQUESTED_QUAL = "requestToHost"; public static final ColumnFQ REQUESTED_COLUMN = new ColumnFQ(NAME, new Text(REQUESTED_QUAL)); + public static final String MERGEABILITY_QUAL = "mergeability"; + public static final ColumnFQ MERGEABILITY_COLUMN = + new ColumnFQ(NAME, new Text(MERGEABILITY_QUAL)); + public static Value encodePrevEndRow(Text per) { if (per == null) { return new Value(new byte[] {0}); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMergeabilityMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMergeabilityMetadata.java new file mode 100644 index 0000000000..0880e402d9 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMergeabilityMetadata.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.metadata.schema; + +import static org.apache.accumulo.core.util.LazySingletons.GSON; + +import java.io.Serializable; +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.admin.TabletMergeability; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.time.SteadyTime; + +import com.google.common.base.Preconditions; + +public class TabletMergeabilityMetadata implements Serializable { + private static final long serialVersionUID = 1L; + + private static final TabletMergeabilityMetadata NEVER = + new TabletMergeabilityMetadata(TabletMergeability.never());; + + private final TabletMergeability tabletMergeability; + private final SteadyTime steadyTime; + + private TabletMergeabilityMetadata(TabletMergeability tabletMergeability, SteadyTime steadyTime) { + this.tabletMergeability = Objects.requireNonNull(tabletMergeability); + this.steadyTime = steadyTime; + // This makes sure that SteadyTime is set if TabletMergeability has a delay, and is null + // if TabletMergeability is NEVER as we don't need to store it in that case + Preconditions.checkArgument(tabletMergeability.isNever() == (steadyTime == null), + "SteadyTime must be set if and only if TabletMergeability delay is >= 0"); + } + + private TabletMergeabilityMetadata(TabletMergeability tabletMergeability) { + this(tabletMergeability, null); + } + + public TabletMergeability getTabletMergeability() { + return tabletMergeability; + } + + public Optional<SteadyTime> getSteadyTime() { + return Optional.ofNullable(steadyTime); + } + + public boolean isMergeable(SteadyTime currentTime) { + if (tabletMergeability.isNever()) { + return false; + } + // Steady time should never be null unless TabletMergeability is NEVER + Preconditions.checkState(steadyTime != null, "SteadyTime should be set"); + var totalDelay = steadyTime.getDuration().plus(tabletMergeability.getDelay().orElseThrow()); + return currentTime.getDuration().compareTo(totalDelay) >= 0; + } + + private static class GSonData { + boolean never; + Long delay; + Long steadyTime; + } + + String toJson() { + GSonData jData = new GSonData(); + jData.never = tabletMergeability.isNever(); + jData.delay = tabletMergeability.getDelay().map(Duration::toNanos).orElse(null); + jData.steadyTime = steadyTime != null ? steadyTime.getNanos() : null; + return GSON.get().toJson(jData); + } + + static TabletMergeabilityMetadata fromJson(String json) { + GSonData jData = GSON.get().fromJson(json, GSonData.class); + if (jData.never) { + Preconditions.checkArgument(jData.delay == null && jData.steadyTime == null, + "delay and steadyTime should be null if mergeability 'never' is true"); + } else { + Preconditions.checkArgument(jData.delay != null && jData.steadyTime != null, + "delay and steadyTime should both be set if mergeability 'never' is false"); + } + TabletMergeability tabletMergeability = jData.never ? TabletMergeability.never() + : TabletMergeability.after(Duration.ofNanos(jData.delay)); + SteadyTime steadyTime = + jData.steadyTime != null ? SteadyTime.from(jData.steadyTime, TimeUnit.NANOSECONDS) : null; + return new TabletMergeabilityMetadata(tabletMergeability, steadyTime); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + TabletMergeabilityMetadata that = (TabletMergeabilityMetadata) o; + return Objects.equals(tabletMergeability, that.tabletMergeability) + && Objects.equals(steadyTime, that.steadyTime); + } + + @Override + public int hashCode() { + return Objects.hash(tabletMergeability, steadyTime); + } + + @Override + public String toString() { + return "TabletMergeabilityMetadata{" + tabletMergeability + ", " + steadyTime + '}'; + } + + public static TabletMergeabilityMetadata never() { + return NEVER; + } + + public static TabletMergeabilityMetadata always(SteadyTime currentTime) { + return new TabletMergeabilityMetadata(TabletMergeability.always(), currentTime); + } + + public static TabletMergeabilityMetadata after(Duration delay, SteadyTime currentTime) { + return new TabletMergeabilityMetadata(TabletMergeability.after(delay), currentTime); + } + + public static Value toValue(TabletMergeabilityMetadata tmm) { + return new Value(tmm.toJson()); + } + + public static TabletMergeabilityMetadata fromValue(Value value) { + return TabletMergeabilityMetadata.fromJson(value.toString()); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index 795ebfafed..2b59f78431 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -25,6 +25,7 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.AVAILABILITY_QUAL; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.MERGEABILITY_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.REQUESTED_QUAL; @@ -123,6 +124,7 @@ public class TabletMetadata { private final Set<FateId> compacted; private final Set<FateId> userCompactionsRequested; private final UnSplittableMetadata unSplittableMetadata; + private final TabletMergeabilityMetadata mergeability; private final Supplier<Long> fileSize; private TabletMetadata(Builder tmBuilder) { @@ -155,6 +157,7 @@ public class TabletMetadata { this.compacted = tmBuilder.compacted.build(); this.userCompactionsRequested = tmBuilder.userCompactionsRequested.build(); this.unSplittableMetadata = tmBuilder.unSplittableMetadata; + this.mergeability = Objects.requireNonNull(tmBuilder.mergeability); this.fileSize = Suppliers.memoize(() -> { // This code was using a java stream. While profiling SplitMillionIT, the stream was showing // up as hot when scanning 1 million tablets. Converted to a for loop to improve performance. @@ -198,7 +201,8 @@ public class TabletMetadata { SELECTED, COMPACTED, USER_COMPACTION_REQUESTED, - UNSPLITTABLE + UNSPLITTABLE, + MERGEABILITY } public static class Location { @@ -439,6 +443,11 @@ public class TabletMetadata { return unSplittableMetadata; } + public TabletMergeabilityMetadata getTabletMergeability() { + ensureFetched(ColumnType.MERGEABILITY); + return mergeability; + } + @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("tableId", tableId) @@ -453,7 +462,8 @@ public class TabletMetadata { .append("operationId", operationId).append("selectedFiles", selectedFiles) .append("futureAndCurrentLocationSet", futureAndCurrentLocationSet) .append("userCompactionsRequested", userCompactionsRequested) - .append("unSplittableMetadata", unSplittableMetadata).toString(); + .append("unSplittableMetadata", unSplittableMetadata).append("mergeability", mergeability) + .toString(); } public List<Entry<Key,Value>> getKeyValues() { @@ -527,6 +537,9 @@ public class TabletMetadata { case REQUESTED_QUAL: tmBuilder.onDemandHostingRequested(true); break; + case MERGEABILITY_QUAL: + tmBuilder.mergeability(TabletMergeabilityMetadata.fromValue(kv.getValue())); + break; default: throw new IllegalStateException("Unexpected TabletColumnFamily qualifier: " + qual); } @@ -689,7 +702,7 @@ public class TabletMetadata { private final ImmutableSet.Builder<FateId> compacted = ImmutableSet.builder(); private final ImmutableSet.Builder<FateId> userCompactionsRequested = ImmutableSet.builder(); private UnSplittableMetadata unSplittableMetadata; - // private Supplier<Long> fileSize; + private TabletMergeabilityMetadata mergeability = TabletMergeabilityMetadata.never(); void table(TableId tableId) { this.tableId = tableId; @@ -799,6 +812,10 @@ public class TabletMetadata { this.unSplittableMetadata = unSplittableMetadata; } + void mergeability(TabletMergeabilityMetadata mergeability) { + this.mergeability = mergeability; + } + void keyValue(Entry<Key,Value> kv) { if (this.keyValues == null) { this.keyValues = ImmutableList.builder(); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java index 44f1915e0e..8ca33d9eb7 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java @@ -30,6 +30,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; @@ -312,6 +313,14 @@ public class TabletMetadataBuilder implements Ample.TabletUpdates<TabletMetadata return this; } + @Override + public TabletMetadataBuilder + putTabletMergeability(TabletMergeabilityMetadata tabletMergeability) { + fetched.add(MERGEABILITY); + internalBuilder.putTabletMergeability(tabletMergeability); + return this; + } + @Override public TabletMetadataBuilder automaticallyPutServerLock(boolean b) { throw new UnsupportedOperationException(); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java index 6052c73a79..4f39eda7b7 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java @@ -390,6 +390,13 @@ public abstract class TabletMutatorBase<T extends Ample.TabletUpdates<T>> return getThis(); } + @Override + public T putTabletMergeability(TabletMergeabilityMetadata tabletMergeability) { + TabletColumnFamily.MERGEABILITY_COLUMN.put(mutation, + TabletMergeabilityMetadata.toValue(tabletMergeability)); + return getThis(); + } + public void setCloseAfterMutate(AutoCloseable closeable) { this.closeAfterMutate = closeable; } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java index a3914ea0ed..04115dfaea 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java @@ -77,6 +77,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Me import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.filters.TabletMetadataFilter; @@ -394,6 +395,9 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable case UNSPLITTABLE: qualifiers.add(SplitColumnFamily.UNSPLITTABLE_COLUMN); break; + case MERGEABILITY: + qualifiers.add(TabletColumnFamily.MERGEABILITY_COLUMN); + break; default: throw new IllegalArgumentException("Unknown col type " + colToFetch); } diff --git a/core/src/main/java/org/apache/accumulo/core/util/time/SteadyTime.java b/core/src/main/java/org/apache/accumulo/core/util/time/SteadyTime.java index d16f15c201..57b348b23d 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/time/SteadyTime.java +++ b/core/src/main/java/org/apache/accumulo/core/util/time/SteadyTime.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.util.time; +import java.io.Serializable; import java.time.Duration; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -30,7 +31,8 @@ import com.google.common.base.Preconditions; * is not expected to represent real world date times, its main use is for computing deltas similar * System.nanoTime but across JVM processes. */ -public class SteadyTime implements Comparable<SteadyTime> { +public class SteadyTime implements Comparable<SteadyTime>, Serializable { + private static final long serialVersionUID = 1L; private final Duration time; diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index 9f4ba14def..1381f44f33 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -44,6 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.lang.reflect.Constructor; +import java.time.Duration; import java.util.AbstractMap; import java.util.EnumSet; import java.util.LinkedHashSet; @@ -643,6 +644,8 @@ public class TabletMetadataTest { .putFile(sf1, dfv1).putFile(sf2, dfv2).putBulkFile(rf1, loadedFateId1) .putBulkFile(rf2, loadedFateId2).putFlushId(27).putDirName("dir1").putScan(sf3).putScan(sf4) .putCompacted(compactFateId1).putCompacted(compactFateId2).putCloned() + .putTabletMergeability( + TabletMergeabilityMetadata.always(SteadyTime.from(1, TimeUnit.SECONDS))) .build(ECOMP, HOSTING_REQUESTED, MERGED, USER_COMPACTION_REQUESTED, UNSPLITTABLE); assertEquals(extent, tm.getExtent()); @@ -662,6 +665,8 @@ public class TabletMetadataTest { assertFalse(tm.hasMerged()); assertNull(tm.getUnSplittable()); assertEquals("OK", tm.getCloned()); + assertEquals(TabletMergeabilityMetadata.always(SteadyTime.from(1, TimeUnit.SECONDS)), + tm.getTabletMergeability()); assertThrows(IllegalStateException.class, tm::getOperationId); assertThrows(IllegalStateException.class, tm::getSuspend); assertThrows(IllegalStateException.class, tm::getTime); @@ -688,6 +693,7 @@ public class TabletMetadataTest { assertThrows(IllegalStateException.class, tm2::hasMerged); assertThrows(IllegalStateException.class, tm2::getUserCompactionsRequested); assertThrows(IllegalStateException.class, tm2::getUnSplittable); + assertThrows(IllegalStateException.class, tm2::getTabletAvailability); var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); CompactionMetadata ecm = @@ -707,7 +713,10 @@ public class TabletMetadataTest { .putSuspension(ser1, SteadyTime.from(45L, TimeUnit.MILLISECONDS)) .putTime(new MetadataTime(479, TimeType.LOGICAL)).putWal(le1).putWal(le2) .setHostingRequested().putSelectedFiles(selFiles).setMerged() - .putUserCompactionRequested(selFilesFateId).setUnSplittable(unsplittableMeta).build(); + .putUserCompactionRequested(selFilesFateId).setUnSplittable(unsplittableMeta) + .putTabletMergeability(TabletMergeabilityMetadata.after(Duration.ofDays(3), + SteadyTime.from(45L, TimeUnit.MILLISECONDS))) + .build(); assertEquals(Set.of(ecid1), tm3.getExternalCompactions().keySet()); assertEquals(Set.of(sf1, sf2), tm3.getExternalCompactions().get(ecid1).getJobFiles()); @@ -724,6 +733,11 @@ public class TabletMetadataTest { assertTrue(tm3.hasMerged()); assertTrue(tm3.getUserCompactionsRequested().contains(selFilesFateId)); assertEquals(unsplittableMeta, tm3.getUnSplittable()); + var tmm = tm3.getTabletMergeability(); + assertEquals(Duration.ofDays(3), tmm.getTabletMergeability().getDelay().orElseThrow()); + assertEquals(SteadyTime.from(45L, TimeUnit.MILLISECONDS), tmm.getSteadyTime().orElseThrow()); + assertTrue(tmm.isMergeable(SteadyTime.from(Duration.ofHours(73)))); + assertFalse(tmm.isMergeable(SteadyTime.from(Duration.ofHours(72)))); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index f91828361f..0d2988423c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -59,6 +59,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ta import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Upgrade12to13; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; @@ -97,6 +98,7 @@ public class MetadataConstraints implements Constraint { TabletColumnFamily.REQUESTED_COLUMN, ServerColumnFamily.SELECTED_COLUMN, SplitColumnFamily.UNSPLITTABLE_COLUMN, + TabletColumnFamily.MERGEABILITY_COLUMN, Upgrade12to13.COMPACT_COL); @SuppressWarnings("deprecation") @@ -297,6 +299,8 @@ public class MetadataConstraints implements Constraint { return "Invalid unsplittable column"; case 4005: return "Malformed availability value"; + case 4006: + return "Malformed mergeability value"; } return null; @@ -376,6 +380,13 @@ public class MetadataConstraints implements Constraint { addViolation(violations, 4005); } break; + case (TabletColumnFamily.MERGEABILITY_QUAL): + try { + TabletMergeabilityMetadata.fromValue(new Value(columnUpdate.getValue())); + } catch (IllegalArgumentException e) { + addViolation(violations, 4006); + } + break; } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java b/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java index b315f1a58c..ef6536da09 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.crypto.CryptoService; import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; @@ -93,7 +94,8 @@ public class FileSystemInitializer { KeyExtent keyExtent = new KeyExtent(tableId, endRow, prevEndRow); var builder = TabletMetadata.builder(keyExtent).putDirName(dirName) .putTime(new MetadataTime(0, TimeType.LOGICAL)) - .putTabletAvailability(TabletAvailability.HOSTED).putPrevEndRow(prevEndRow); + .putTabletAvailability(TabletAvailability.HOSTED) + .putTabletMergeability(TabletMergeabilityMetadata.never()).putPrevEndRow(prevEndRow); for (String file : files) { builder.putFile(new ReferencedTabletFile(new Path(file)).insert(), new DataFileValue(0, 0)); } diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java index dfcf970330..5407d10de5 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.lang.reflect.Method; +import java.time.Duration; import java.util.Base64; import java.util.List; import java.util.Set; @@ -56,6 +57,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Su import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.server.ServerContext; @@ -639,6 +641,53 @@ public class MetadataConstraintsTest { assertEquals((short) 3102, violations.get(0)); } + @Test + public void testMergeabilityColumn() { + MetadataConstraints mc = new MetadataConstraints(); + Mutation m; + List<Short> violations; + + // Delay must be >= 0 + m = new Mutation(new Text("0;foo")); + TabletColumnFamily.MERGEABILITY_COLUMN.put(m, + new Value("{\"delay\":-1,\"steadyTime\":1,\"never\"=false}")); + assertViolation(mc, m, (short) 4006); + + // SteadyTime must be null if never is true + m = new Mutation(new Text("0;foo")); + TabletColumnFamily.MERGEABILITY_COLUMN.put(m, new Value("{\"steadyTime\":1,\"never\"=true}")); + assertViolation(mc, m, (short) 4006); + + // delay must be null if never is true + m = new Mutation(new Text("0;foo")); + TabletColumnFamily.MERGEABILITY_COLUMN.put(m, new Value("{\"delay\":1,\"never\"=true}")); + assertViolation(mc, m, (short) 4006); + + // SteadyTime must be set if delay positive + m = new Mutation(new Text("0;foo")); + TabletColumnFamily.MERGEABILITY_COLUMN.put(m, new Value("{\"delay\":10,\"never\"=false}")); + assertViolation(mc, m, (short) 4006); + + m = new Mutation(new Text("0;foo")); + TabletColumnFamily.MERGEABILITY_COLUMN.put(m, + TabletMergeabilityMetadata.toValue(TabletMergeabilityMetadata.never())); + violations = mc.check(createEnv(), m); + assertTrue(violations.isEmpty()); + + m = new Mutation(new Text("0;foo")); + TabletColumnFamily.MERGEABILITY_COLUMN.put(m, TabletMergeabilityMetadata + .toValue(TabletMergeabilityMetadata.always(SteadyTime.from(1, TimeUnit.SECONDS)))); + violations = mc.check(createEnv(), m); + assertTrue(violations.isEmpty()); + + m = new Mutation(new Text("0;foo")); + TabletColumnFamily.MERGEABILITY_COLUMN.put(m, + TabletMergeabilityMetadata.toValue(TabletMergeabilityMetadata.after(Duration.ofDays(3), + SteadyTime.from(Duration.ofHours(1))))); + violations = mc.check(createEnv(), m); + assertTrue(violations.isEmpty()); + } + // Encode a row how it would appear in Json private static String encodeRowForMetadata(String row) { try { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index de1bae81a8..7cdde03dc5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -80,6 +80,7 @@ import org.apache.accumulo.core.manager.thrift.TFateId; import org.apache.accumulo.core.manager.thrift.TFateInstanceType; import org.apache.accumulo.core.manager.thrift.TFateOperation; import org.apache.accumulo.core.manager.thrift.ThriftPropertyException; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.util.ByteBufferUtil; import org.apache.accumulo.core.util.TextUtil; @@ -256,7 +257,7 @@ class FateServiceHandler implements FateService.Iface { manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName, timeType, options, splitsPath, splitCount, splitsDirsPath, initialTableState, - initialTabletAvailability, namespaceId)), + initialTabletAvailability, namespaceId, TabletMergeabilityMetadata.never())), autoCleanup, goalMessage); break; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TableInfo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TableInfo.java index 5294c3ef04..8852a41cd9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TableInfo.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TableInfo.java @@ -26,6 +26,7 @@ import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.hadoop.fs.Path; public class TableInfo implements Serializable { @@ -51,6 +52,8 @@ public class TableInfo implements Serializable { private TabletAvailability initialTabletAvailability; + private TabletMergeabilityMetadata initialTabletMergeability; + public TabletAvailability getInitialTabletAvailability() { return initialTabletAvailability; } @@ -133,4 +136,11 @@ public class TableInfo implements Serializable { this.initialSplitSize = initialSplitSize; } + public TabletMergeabilityMetadata getInitialTabletMergeability() { + return initialTabletMergeability; + } + + public void setInitialTabletMergeability(TabletMergeabilityMetadata initialTabletMergeability) { + this.initialTabletMergeability = initialTabletMergeability; + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java index 3f5a379c8a..f0d74597db 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/CreateTable.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.TableInfo; @@ -47,7 +48,8 @@ public class CreateTable extends ManagerRepo { public CreateTable(String user, String tableName, TimeType timeType, Map<String,String> props, Path splitPath, int splitCount, Path splitDirsPath, InitialTableState initialTableState, - TabletAvailability initialTabletAvailability, NamespaceId namespaceId) { + TabletAvailability initialTabletAvailability, NamespaceId namespaceId, + TabletMergeabilityMetadata initialTabletMergeability) { tableInfo = new TableInfo(); tableInfo.setTableName(tableName); tableInfo.setTimeType(timeType); @@ -59,6 +61,7 @@ public class CreateTable extends ManagerRepo { tableInfo.setInitialTableState(initialTableState); tableInfo.setSplitDirsPath(splitDirsPath); tableInfo.setInitialTabletAvailability(initialTabletAvailability); + tableInfo.setInitialTabletMergeability(initialTabletMergeability); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java index 08cfcd194f..2b987c7a4e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java @@ -92,6 +92,7 @@ class PopulateMetadata extends ManagerRepo { tabletMutator.putDirName(dirName); tabletMutator.putTime(new MetadataTime(0, tableInfo.getTimeType())); tabletMutator.putTabletAvailability(tableInfo.getInitialTabletAvailability()); + tabletMutator.putTabletMergeability(tableInfo.getInitialTabletMergeability()); tabletMutator.mutate(); prevSplit = split; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java index cdd54ac143..5b490ab567 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java @@ -186,6 +186,7 @@ public class MergeTablets extends ManagerRepo { tabletMutator.putTabletAvailability( DeleteRows.getMergeTabletAvailability(range, tabletAvailabilities)); tabletMutator.putPrevEndRow(firstTabletMeta.getPrevEndRow()); + tabletMutator.putTabletMergeability(lastTabletMeta.getTabletMergeability()); // scan entries are related to a hosted tablet, this tablet is not hosted so can safely // delete these diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java index 60073e987a..c614ce4be2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/FindSplits.java @@ -48,7 +48,7 @@ public class FindSplits extends ManagerRepo { private final SplitInfo splitInfo; public FindSplits(KeyExtent extent) { - this.splitInfo = new SplitInfo(extent, new TreeSet<>()); + this.splitInfo = new SplitInfo(extent, new TreeSet<>(), true); } @Override @@ -156,7 +156,7 @@ public class FindSplits extends ManagerRepo { return null; } - return new PreSplit(extent, splits); + return new PreSplit(extent, splits, true); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index 6d89878f95..41d8d039ea 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -50,12 +50,16 @@ public class PreSplit extends ManagerRepo { private final SplitInfo splitInfo; - public PreSplit(KeyExtent expectedExtent, SortedSet<Text> splits) { + public PreSplit(KeyExtent expectedExtent, SortedSet<Text> splits, boolean systemCreated) { Objects.requireNonNull(expectedExtent); Objects.requireNonNull(splits); Preconditions.checkArgument(!splits.isEmpty()); Preconditions.checkArgument(!expectedExtent.isRootTablet()); - this.splitInfo = new SplitInfo(expectedExtent, splits); + this.splitInfo = new SplitInfo(expectedExtent, splits, systemCreated); + } + + public PreSplit(KeyExtent expectedExtent, SortedSet<Text> splits) { + this(expectedExtent, splits, false); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java index 7d97e6a34e..14bac1fe7e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java @@ -36,12 +36,14 @@ public class SplitInfo implements Serializable { private final byte[] prevEndRow; private final byte[] endRow; private final byte[][] splits; + private final boolean systemCreated; - public SplitInfo(KeyExtent extent, SortedSet<Text> splits) { + public SplitInfo(KeyExtent extent, SortedSet<Text> splits, boolean systemCreated) { this.tableId = extent.tableId(); this.prevEndRow = extent.prevEndRow() == null ? null : TextUtil.getBytes(extent.prevEndRow()); this.endRow = extent.endRow() == null ? null : TextUtil.getBytes(extent.endRow()); this.splits = new byte[splits.size()][]; + this.systemCreated = systemCreated; int index = 0; for (var split : splits) { @@ -85,4 +87,7 @@ public class SplitInfo implements Serializable { return tablets; } + boolean isSystemCreated() { + return systemCreated; + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index ce48d480b1..7da6f36af6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; @@ -218,7 +219,9 @@ public class UpdateTablets extends ManagerRepo { .debug("{} copying compacted marker to new child tablet {}", fateId, compactedFateId)); mutator.putTabletAvailability(tabletMetadata.getTabletAvailability()); - + mutator.putTabletMergeability( + splitInfo.isSystemCreated() ? TabletMergeabilityMetadata.always(manager.getSteadyTime()) + : TabletMergeabilityMetadata.never()); tabletMetadata.getLoaded().forEach((k, v) -> mutator.putBulkFile(k.getTabletFile(), v)); newTabletsFiles.get(newExtent).forEach(mutator::putFile); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java index 76a475105e..8fd15f78ec 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java @@ -32,6 +32,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; @@ -73,6 +74,7 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadataBuilder; import org.apache.accumulo.core.metadata.schema.TabletOperationId; @@ -110,7 +112,7 @@ public class MergeTabletsTest { private static final Set<TabletMetadata.ColumnType> COLUMNS_HANDLED_BY_MERGE = EnumSet.of(TIME, LOGS, FILES, PREV_ROW, OPID, LOCATION, ECOMP, SELECTED, LOADED, USER_COMPACTION_REQUESTED, MERGED, LAST, SCANS, DIR, CLONED, FLUSH_ID, FLUSH_NONCE, - SUSPEND, AVAILABILITY, HOSTING_REQUESTED, COMPACTED, UNSPLITTABLE); + SUSPEND, AVAILABILITY, HOSTING_REQUESTED, COMPACTED, UNSPLITTABLE, MERGEABILITY); /** * The purpose of this test is to catch new tablet metadata columns that were added w/o @@ -150,15 +152,18 @@ public class MergeTabletsTest { var availability = TabletAvailability.HOSTED; var lastLocation = TabletMetadata.Location.last("1.2.3.4:1234", "123456789"); var suspendingTServer = SuspendingTServer.fromValue(new Value("1.2.3.4:5|56")); - - var tablet1 = TabletMetadata.builder(ke1).putOperation(opid).putDirName("td1") - .putFile(file3, dfv3).putTime(MetadataTime.parse("L3")) - .putTabletAvailability(TabletAvailability.HOSTED).build(LOCATION, LOGS, FILES, ECOMP, - MERGED, COMPACTED, SELECTED, USER_COMPACTION_REQUESTED, LOADED, CLONED); - var tablet2 = TabletMetadata.builder(ke2).putOperation(opid).putDirName("td2") - .putFile(file4, dfv4).putTime(MetadataTime.parse("L2")) - .putTabletAvailability(TabletAvailability.HOSTED).build(LOCATION, LOGS, FILES, ECOMP, - MERGED, COMPACTED, SELECTED, USER_COMPACTION_REQUESTED, LOADED, CLONED); + var mergeability = TabletMergeabilityMetadata.always(SteadyTime.from(1, TimeUnit.SECONDS)); + + var tablet1 = + TabletMetadata.builder(ke1).putOperation(opid).putDirName("td1").putFile(file3, dfv3) + .putTime(MetadataTime.parse("L3")).putTabletAvailability(TabletAvailability.HOSTED) + .putTabletMergeability(mergeability).build(LOCATION, LOGS, FILES, ECOMP, MERGED, + COMPACTED, SELECTED, USER_COMPACTION_REQUESTED, LOADED, CLONED); + var tablet2 = + TabletMetadata.builder(ke2).putOperation(opid).putDirName("td2").putFile(file4, dfv4) + .putTime(MetadataTime.parse("L2")).putTabletAvailability(TabletAvailability.HOSTED) + .putTabletMergeability(mergeability).build(LOCATION, LOGS, FILES, ECOMP, MERGED, + COMPACTED, SELECTED, USER_COMPACTION_REQUESTED, LOADED, CLONED); var tabletFiles = Map.of(file1, dfv1, file2, dfv2); @@ -193,6 +198,7 @@ public class MergeTabletsTest { EasyMock.expect(lastTabletMeta.getSuspend()).andReturn(suspendingTServer).atLeastOnce(); EasyMock.expect(lastTabletMeta.getLast()).andReturn(lastLocation).atLeastOnce(); EasyMock.expect(lastTabletMeta.getUnSplittable()).andReturn(unsplittableMeta).atLeastOnce(); + EasyMock.expect(lastTabletMeta.getTabletMergeability()).andReturn(mergeability).atLeastOnce(); EasyMock.replay(lastTabletMeta, compactions); @@ -228,6 +234,10 @@ public class MergeTabletsTest { EasyMock.expect(tabletMutator.deleteSuspension()).andReturn(tabletMutator); EasyMock.expect(tabletMutator.deleteLocation(lastLocation)).andReturn(tabletMutator); EasyMock.expect(tabletMutator.deleteUnSplittable()).andReturn(tabletMutator); + EasyMock + .expect(tabletMutator.putTabletMergeability( + TabletMergeabilityMetadata.always(SteadyTime.from(1, TimeUnit.SECONDS)))) + .andReturn(tabletMutator).once(); }); @@ -376,17 +386,17 @@ public class MergeTabletsTest { .putTime(MetadataTime.parse(times[0])).putTabletAvailability(TabletAvailability.HOSTED) .build(LOCATION, LOGS, FILES, ECOMP, MERGED, COMPACTED, SELECTED, USER_COMPACTION_REQUESTED, LOADED, CLONED, SCANS, HOSTING_REQUESTED, SUSPEND, LAST, - UNSPLITTABLE); + UNSPLITTABLE, MERGEABILITY); var tablet2 = TabletMetadata.builder(ke2).putOperation(opid).putDirName("td2") .putTime(MetadataTime.parse(times[1])).putTabletAvailability(TabletAvailability.HOSTED) .build(LOCATION, LOGS, FILES, ECOMP, MERGED, COMPACTED, SELECTED, USER_COMPACTION_REQUESTED, LOADED, CLONED, SCANS, HOSTING_REQUESTED, SUSPEND, LAST, - UNSPLITTABLE); + UNSPLITTABLE, MERGEABILITY); var tablet3 = TabletMetadata.builder(ke3).putOperation(opid).putDirName("td3") .putTime(MetadataTime.parse(times[2])).putTabletAvailability(TabletAvailability.HOSTED) .build(LOCATION, LOGS, FILES, ECOMP, MERGED, COMPACTED, SELECTED, USER_COMPACTION_REQUESTED, LOADED, CLONED, SCANS, HOSTING_REQUESTED, SUSPEND, LAST, - UNSPLITTABLE); + UNSPLITTABLE, MERGEABILITY); testMerge(List.of(tablet1, tablet2, tablet3), tableId, null, null, tabletMutator -> { EasyMock.expect(tabletMutator.putTime(MetadataTime.parse("L30"))).andReturn(tabletMutator) @@ -396,6 +406,9 @@ public class MergeTabletsTest { EasyMock.expect(tabletMutator.putPrevEndRow(ke1.prevEndRow())).andReturn(tabletMutator) .once(); EasyMock.expect(tabletMutator.setMerged()).andReturn(tabletMutator).once(); + // Current default if not set is NEVER + EasyMock.expect(tabletMutator.putTabletMergeability(TabletMergeabilityMetadata.never())) + .andReturn(tabletMutator).once(); }); } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index a317f8375a..82f2e5949f 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.data.TableId; @@ -49,12 +50,14 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.split.Splitter; import org.apache.accumulo.server.ServerContext; @@ -83,13 +86,13 @@ public class UpdateTabletsTest { * developer has determined that split code can handle that column OR has opened an issue about * handling it. */ - private static final Set<ColumnType> COLUMNS_HANDLED_BY_SPLIT = - EnumSet.of(ColumnType.TIME, ColumnType.LOGS, ColumnType.FILES, ColumnType.PREV_ROW, - ColumnType.OPID, ColumnType.LOCATION, ColumnType.ECOMP, ColumnType.SELECTED, - ColumnType.LOADED, ColumnType.USER_COMPACTION_REQUESTED, ColumnType.MERGED, - ColumnType.LAST, ColumnType.SCANS, ColumnType.DIR, ColumnType.CLONED, ColumnType.FLUSH_ID, - ColumnType.FLUSH_NONCE, ColumnType.SUSPEND, ColumnType.AVAILABILITY, - ColumnType.HOSTING_REQUESTED, ColumnType.COMPACTED, ColumnType.UNSPLITTABLE); + private static final Set<ColumnType> COLUMNS_HANDLED_BY_SPLIT = EnumSet.of(ColumnType.TIME, + ColumnType.LOGS, ColumnType.FILES, ColumnType.PREV_ROW, ColumnType.OPID, ColumnType.LOCATION, + ColumnType.ECOMP, ColumnType.SELECTED, ColumnType.LOADED, + ColumnType.USER_COMPACTION_REQUESTED, ColumnType.MERGED, ColumnType.LAST, ColumnType.SCANS, + ColumnType.DIR, ColumnType.CLONED, ColumnType.FLUSH_ID, ColumnType.FLUSH_NONCE, + ColumnType.SUSPEND, ColumnType.AVAILABILITY, ColumnType.HOSTING_REQUESTED, + ColumnType.COMPACTED, ColumnType.UNSPLITTABLE, ColumnType.MERGEABILITY); /** * The purpose of this test is to catch new tablet metadata columns that were added w/o @@ -230,6 +233,8 @@ public class UpdateTabletsTest { EasyMock.expect(splitter.getCachedFileInfo(tableId, file3)).andReturn(newFileInfo("d", "f")); EasyMock.expect(splitter.getCachedFileInfo(tableId, file4)).andReturn(newFileInfo("d", "j")); EasyMock.expect(manager.getSplitter()).andReturn(splitter).atLeastOnce(); + EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100_000, TimeUnit.SECONDS)) + .atLeastOnce(); ServiceLock managerLock = EasyMock.mock(ServiceLock.class); EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes(); @@ -294,6 +299,11 @@ public class UpdateTabletsTest { EasyMock.expect(tablet1Mutator.putFile(file1, new DataFileValue(333, 33, 20))) .andReturn(tablet1Mutator); EasyMock.expect(tablet1Mutator.putFile(file2, dfv2)).andReturn(tablet1Mutator); + // SplitInfo marked as system generated so should be set to ALWAYS (0 delay) + EasyMock + .expect(tablet1Mutator.putTabletMergeability( + TabletMergeabilityMetadata.always(SteadyTime.from(100_000, TimeUnit.SECONDS)))) + .andReturn(tablet1Mutator); tablet1Mutator.submit(EasyMock.anyObject()); EasyMock.expectLastCall().once(); EasyMock.expect(tabletsMutator.mutateTablet(newExtent1)).andReturn(tablet1Mutator); @@ -310,6 +320,11 @@ public class UpdateTabletsTest { EasyMock.expect(tablet2Mutator.putCompacted(ucfid1)).andReturn(tablet2Mutator); EasyMock.expect(tablet2Mutator.putCompacted(ucfid3)).andReturn(tablet2Mutator); EasyMock.expect(tablet2Mutator.putTabletAvailability(availability)).andReturn(tablet2Mutator); + // SplitInfo marked as system generated so should be set to ALWAYS (0 delay) + EasyMock + .expect(tablet2Mutator.putTabletMergeability( + TabletMergeabilityMetadata.always(SteadyTime.from(100_000, TimeUnit.SECONDS)))) + .andReturn(tablet2Mutator); EasyMock.expect(tablet2Mutator.putBulkFile(loaded1.getTabletFile(), flid1)) .andReturn(tablet2Mutator); EasyMock.expect(tablet2Mutator.putBulkFile(loaded2.getTabletFile(), flid2)) @@ -367,7 +382,7 @@ public class UpdateTabletsTest { // the original tablet SortedSet<Text> splits = new TreeSet<>(List.of(newExtent1.endRow(), newExtent2.endRow())); UpdateTablets updateTablets = - new UpdateTablets(new SplitInfo(origExtent, splits), List.of(dir1, dir2)); + new UpdateTablets(new SplitInfo(origExtent, splits, true), List.of(dir1, dir2)); updateTablets.call(fateId, manager); EasyMock.verify(manager, context, ample, tabletMeta, splitter, tabletsMutator, tablet1Mutator, @@ -446,7 +461,7 @@ public class UpdateTabletsTest { // the original tablet SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"))); UpdateTablets updateTablets = - new UpdateTablets(new SplitInfo(origExtent, splits), List.of("d1")); + new UpdateTablets(new SplitInfo(origExtent, splits, true), List.of("d1")); updateTablets.call(fateId, manager); EasyMock.verify(manager, context, ample); diff --git a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleIT.java b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleIT.java index 2c8359bf4e..4559301235 100644 --- a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleIT.java @@ -119,6 +119,7 @@ public class TestAmpleIT extends SharedMiniClusterBase { assertNotNull(tm.getExtent()); assertNotNull(tm.getTabletAvailability()); assertNotNull(tm.getTime()); + assertNotNull(tm.getTabletMergeability()); count.incrementAndGet(); }); } diff --git a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java index 0b1c2b9e84..4d8f8c44fd 100644 --- a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java +++ b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java @@ -53,6 +53,7 @@ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata.TableOptions; import org.apache.accumulo.core.security.Authorizations; @@ -149,6 +150,7 @@ public class TestAmple { tabletMutator.putDirName(dirName); tabletMutator.putTime(new MetadataTime(0, TimeType.MILLIS)); tabletMutator.putTabletAvailability(TabletAvailability.HOSTED); + tabletMutator.putTabletMergeability(TabletMergeabilityMetadata.never()); tabletMutator.mutate(); } catch (Exception e) { throw new IllegalStateException(e); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java index a0cf45ee79..3b524d90bc 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java @@ -198,7 +198,7 @@ public class ManagerRepoIT extends SharedMiniClusterBase { assertEquals(opid, testAmple.readTablet(extent).getOperationId()); var eoRepo = new AllocateDirsAndEnsureOnline( - new SplitInfo(extent, new TreeSet<>(List.of(new Text("sp1"))))); + new SplitInfo(extent, new TreeSet<>(List.of(new Text("sp1"))), true)); // The repo should delete the opid and throw an exception assertThrows(ThriftTableOperationException.class, () -> eoRepo.call(fateId, manager)); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java index 76c62a5460..72ff2bb8ca 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.test.functional; +import static org.junit.jupiter.api.Assertions.assertEquals; + import java.time.Duration; import java.util.Collection; import java.util.Iterator; @@ -30,7 +32,10 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.hadoop.io.Text; @@ -87,6 +92,13 @@ public class AddSplitIT extends AccumuloClusterHarness { } verifyData(c, tableName, 2L); + + TableId id = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + try (TabletsMetadata tm = getServerContext().getAmple().readTablets().forTable(id).build()) { + // Default for user created tablets should be mergeability set to NEVER + tm.stream().forEach(tablet -> assertEquals(TabletMergeabilityMetadata.never(), + tablet.getTabletMergeability())); + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java index 5a4dd306cc..e805143e67 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java @@ -54,6 +54,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; @@ -284,10 +285,12 @@ public class MetadataIT extends AccumuloClusterHarness { assertEquals(maxVersions, tableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions")); - // Verify all tablets are HOSTED + // Verify all tablets are HOSTED and Mergeablity is NEVER try (var tablets = client.getAmple().readTablets().forTable(tableId).build()) { assertTrue( tablets.stream().allMatch(tm -> tm.getTabletAvailability() == TabletAvailability.HOSTED)); + assertTrue(tablets.stream() + .allMatch(tm -> tm.getTabletMergeability().equals(TabletMergeabilityMetadata.never()))); } } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java index 720e56f93f..a985d3969a 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java @@ -57,6 +57,7 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.conf.Property; @@ -68,6 +69,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.harness.AccumuloClusterHarness; @@ -250,6 +252,7 @@ public class SplitIT extends AccumuloClusterHarness { KeyExtent extent = new KeyExtent(id, null, null); s.setRange(extent.toMetaRange()); TabletColumnFamily.PREV_ROW_COLUMN.fetch(s); + TabletColumnFamily.MERGEABILITY_COLUMN.fetch(s); int count = 0; int shortened = 0; for (Entry<Key,Value> entry : s) { @@ -257,6 +260,15 @@ public class SplitIT extends AccumuloClusterHarness { if (extent.endRow() != null && extent.endRow().toString().length() < 14) { shortened++; } + if (TabletColumnFamily.MERGEABILITY_COLUMN.getColumnQualifier() + .equals(entry.getKey().getColumnQualifier())) { + // Default tablet should be set to NEVER, all newly generated system splits should be + // set to ALWAYS + var mergeability = + extent.endRow() == null ? TabletMergeability.never() : TabletMergeability.always(); + assertEquals(mergeability, + TabletMergeabilityMetadata.fromValue(entry.getValue()).getTabletMergeability()); + } count++; }