This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 88b108e9ac Narrow the files checked by compaction commit (#5153) 88b108e9ac is described below commit 88b108e9ac8fe00b231d53cf236eb794b448d922 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Mon Dec 9 08:11:22 2024 -0500 Narrow the files checked by compaction commit (#5153) Previously the conditional mutation to commit a compaction would require all files in the tablet be the same as read earlier and on a busy tablet this could fail and retry often. The check has now been narrowed to only verify that the files involved with the compaction still exist. A new method was added to Ample called requireFiles(Set<StoredTabletFile> files) which creates a condition for each file column to verify each one exists. This closes #5117 --- .../accumulo/core/metadata/schema/Ample.java | 6 +++ .../metadata/ConditionalTabletMutatorImpl.java | 14 ++++++ .../coordinator/commit/CommitCompaction.java | 5 ++- .../coordinator/commit/CompactionCommitData.java | 5 +-- .../test/functional/AmpleConditionalWriterIT.java | 51 ++++++++++++++++++++++ 5 files changed, 76 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 888775aed5..9528ec9974 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -21,6 +21,7 @@ package org.apache.accumulo.core.metadata.schema; import java.util.Collection; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; @@ -514,6 +515,11 @@ public interface Ample { ConditionalTabletMutator requireAbsentLogs(); + /** + * Require that a tablet contain all the files in the set + */ + ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files); + /** * <p> * Ample provides the following features on top of the conditional writer to help automate diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 9978dbbe7c..da8e050504 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -45,7 +45,9 @@ import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iterators.SortedFilesIterator; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily; @@ -330,6 +332,18 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit return this; } + @Override + public ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, PresentIterator.class); + for (StoredTabletFile file : files) { + Condition c = new Condition(DataFileColumnFamily.STR_NAME, file.getMetadata()) + .setValue(PresentIterator.VALUE).setIterators(is); + mutation.addCondition(c); + } + return this; + } + @Override public void submit(Ample.RejectionHandler rejectionCheck) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java index 0daa221f1f..cb9492d7e3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@ -110,7 +110,7 @@ public class CommitCompaction extends ManagerRepo { while (canCommitCompaction(ecid, tablet)) { CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid); - // the compacted files should not exists in the tablet already + // the compacted files should not exist in the tablet already var tablet2 = tablet; newDatafile.ifPresent( newFile -> Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()), @@ -118,7 +118,8 @@ public class CommitCompaction extends ManagerRepo { try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { var tabletMutator = tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation() - .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION); + .requireCompaction(ecid).requireSame(tablet, LOCATION) + .requireFiles(commitData.getJobFiles()); if (ecm.getKind() == CompactionKind.USER) { tabletMutator.requireSame(tablet, SELECTED, COMPACTED); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java index 23b293c25e..0e7587d633 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java @@ -19,7 +19,6 @@ package org.apache.accumulo.manager.compaction.coordinator.commit; import java.io.Serializable; -import java.util.Collection; import java.util.Set; import java.util.stream.Collectors; @@ -56,7 +55,7 @@ public class CompactionCommitData implements Serializable { return KeyExtent.fromThrift(textent).tableId(); } - public Collection<StoredTabletFile> getJobFiles() { - return inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toList()); + public Set<StoredTabletFile> getJobFiles() { + return inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toSet()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index a343b94854..ca0baf7435 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -1746,4 +1746,55 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { } } } + + @Test + public void testRequiresFiles() { + var context = cluster.getServerContext(); + + var stf1 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")); + var stf2 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf")); + var stf3 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")); + var stf4 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf")); + var dfv = new DataFileValue(100, 100); + + // Add 3 of the files, skip the 4th file + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, dfv).putFile(stf2, dfv) + .putFile(stf3, dfv).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + + // Test mutation is accepted when given all files + var time1 = MetadataTime.parse("L50"); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf2, stf3)) + .putTime(time1).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(time1, context.getAmple().readTablet(e1).getTime()); + + // Test mutation is accepted when a subset of files is given + var time2 = MetadataTime.parse("L60"); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf3)).putTime(time2) + .submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(time2, context.getAmple().readTablet(e1).getTime()); + + // Test mutation is rejected when a file is given that the tablet does not have + var time3 = MetadataTime.parse("L60"); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf4)).putTime(time3) + .submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + } + // Should be previous time still as the mutation was rejected + assertEquals(time2, context.getAmple().readTablet(e1).getTime()); + } }