This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 88b108e9ac Narrow the files checked by compaction commit (#5153)
88b108e9ac is described below

commit 88b108e9ac8fe00b231d53cf236eb794b448d922
Author: Christopher L. Shannon <cshan...@apache.org>
AuthorDate: Mon Dec 9 08:11:22 2024 -0500

    Narrow the files checked by compaction commit (#5153)
    
    Previously the conditional mutation to commit a compaction would require
    all files in the tablet be the same as read earlier and on a busy tablet
    this could fail and retry often. The check has now been narrowed to only
    verify that the files involved with the compaction still exist. A new
    method was added to Ample called requireFiles(Set<StoredTabletFile> files)
    which creates a condition for each file column to verify each one
    exists.
    
    This closes #5117
---
 .../accumulo/core/metadata/schema/Ample.java       |  6 +++
 .../metadata/ConditionalTabletMutatorImpl.java     | 14 ++++++
 .../coordinator/commit/CommitCompaction.java       |  5 ++-
 .../coordinator/commit/CompactionCommitData.java   |  5 +--
 .../test/functional/AmpleConditionalWriterIT.java  | 51 ++++++++++++++++++++++
 5 files changed, 76 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 888775aed5..9528ec9974 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.core.metadata.schema;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
@@ -514,6 +515,11 @@ public interface Ample {
 
     ConditionalTabletMutator requireAbsentLogs();
 
+    /**
+     * Require that a tablet contain all the files in the set
+     */
+    ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files);
+
     /**
      * <p>
      * Ample provides the following features on top of the conditional writer 
to help automate
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 9978dbbe7c..da8e050504 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -45,7 +45,9 @@ import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.iterators.SortedFilesIterator;
 import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
@@ -330,6 +332,18 @@ public class ConditionalTabletMutatorImpl extends 
TabletMutatorBase<Ample.Condit
     return this;
   }
 
+  @Override
+  public ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files) {
+    Preconditions.checkState(updatesEnabled, "Cannot make updates after 
calling mutate.");
+    IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, 
PresentIterator.class);
+    for (StoredTabletFile file : files) {
+      Condition c = new Condition(DataFileColumnFamily.STR_NAME, 
file.getMetadata())
+          .setValue(PresentIterator.VALUE).setIterators(is);
+      mutation.addCondition(c);
+    }
+    return this;
+  }
+
   @Override
   public void submit(Ample.RejectionHandler rejectionCheck) {
     Preconditions.checkState(updatesEnabled, "Cannot make updates after 
calling mutate.");
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
index 0daa221f1f..cb9492d7e3 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
@@ -110,7 +110,7 @@ public class CommitCompaction extends ManagerRepo {
     while (canCommitCompaction(ecid, tablet)) {
       CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);
 
-      // the compacted files should not exists in the tablet already
+      // the compacted files should not exist in the tablet already
       var tablet2 = tablet;
       newDatafile.ifPresent(
           newFile -> 
Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()),
@@ -118,7 +118,8 @@ public class CommitCompaction extends ManagerRepo {
 
       try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
         var tabletMutator = 
tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation()
-            .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION);
+            .requireCompaction(ecid).requireSame(tablet, LOCATION)
+            .requireFiles(commitData.getJobFiles());
 
         if (ecm.getKind() == CompactionKind.USER) {
           tabletMutator.requireSame(tablet, SELECTED, COMPACTED);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java
index 23b293c25e..0e7587d633 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java
@@ -19,7 +19,6 @@
 package org.apache.accumulo.manager.compaction.coordinator.commit;
 
 import java.io.Serializable;
-import java.util.Collection;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -56,7 +55,7 @@ public class CompactionCommitData implements Serializable {
     return KeyExtent.fromThrift(textent).tableId();
   }
 
-  public Collection<StoredTabletFile> getJobFiles() {
-    return 
inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toList());
+  public Set<StoredTabletFile> getJobFiles() {
+    return 
inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toSet());
   }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index a343b94854..ca0baf7435 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -1746,4 +1746,55 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
       }
     }
   }
+
+  @Test
+  public void testRequiresFiles() {
+    var context = cluster.getServerContext();
+
+    var stf1 = StoredTabletFile
+        .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
+    var stf2 = StoredTabletFile
+        .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
+    var stf3 = StoredTabletFile
+        .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
+    var stf4 = StoredTabletFile
+        .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf"));
+    var dfv = new DataFileValue(100, 100);
+
+    // Add 3 of the files, skip the 4th file
+    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, 
dfv).putFile(stf2, dfv)
+          .putFile(stf3, dfv).submit(tm -> false);
+      assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
+    }
+    assertEquals(Set.of(stf1, stf2, stf3), 
context.getAmple().readTablet(e1).getFiles());
+
+    // Test mutation is accepted when given all files
+    var time1 = MetadataTime.parse("L50");
+    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, 
stf2, stf3))
+          .putTime(time1).submit(tm -> false);
+      assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
+    }
+    assertEquals(time1, context.getAmple().readTablet(e1).getTime());
+
+    // Test mutation is accepted when a subset of files is given
+    var time2 = MetadataTime.parse("L60");
+    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, 
stf3)).putTime(time2)
+          .submit(tm -> false);
+      assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
+    }
+    assertEquals(time2, context.getAmple().readTablet(e1).getTime());
+
+    // Test mutation is rejected when a file is given that the tablet does not 
have
+    var time3 = MetadataTime.parse("L60");
+    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, 
stf4)).putTime(time3)
+          .submit(tm -> false);
+      assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
+    }
+    // Should be previous time still as the mutation was rejected
+    assertEquals(time2, context.getAmple().readTablet(e1).getTime());
+  }
 }

Reply via email to