This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 802b4d689c narrows check of loaded files in conditional mutation 
(#5166)
802b4d689c is described below

commit 802b4d689c32db84edd5370f9ea836d7b9e7c1a8
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Dec 12 13:25:31 2024 -0500

    narrows check of loaded files in conditional mutation (#5166)
    
    Bulk load fate code was reading a tablets loaded flags, checking it was
    not in them, and then making a conditional mutation that required the
    set of bulk flags to to be the same.  Requiring the set to be the same
    caused unnecessary  collisions between bulk imports.  Modified the
    conditional check to require the loaded flags for the fate operation
    to be absent.
---
 .../accumulo/core/metadata/schema/Ample.java       |  5 ++
 .../metadata/ConditionalTabletMutatorImpl.java     | 11 ++++
 .../manager/tableOps/bulkVer2/LoadFiles.java       |  4 +-
 .../test/functional/AmpleConditionalWriterIT.java  | 61 ++++++++++++++++++++++
 4 files changed, 79 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 699ee319a1..b29b879d89 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -525,6 +525,11 @@ public interface Ample {
      */
     ConditionalTabletMutator requireLessOrEqualsFiles(long limit);
 
+    /**
+     * Requires that a tablet not have these loaded flags set.
+     */
+    ConditionalTabletMutator requireAbsentLoaded(Set<ReferencedTabletFile> 
files);
+
     /**
      * <p>
      * Ample provides the following features on top of the conditional writer 
to help automate
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 8cee60d96d..ff54603a82 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.iterators.SortedFilesIterator;
 import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator;
@@ -353,6 +354,16 @@ public class ConditionalTabletMutatorImpl extends 
TabletMutatorBase<Ample.Condit
     return this;
   }
 
+  @Override
+  public ConditionalTabletMutator 
requireAbsentLoaded(Set<ReferencedTabletFile> files) {
+    Preconditions.checkState(updatesEnabled, "Cannot make updates after 
calling mutate.");
+    for (ReferencedTabletFile file : files) {
+      Condition c = new Condition(BulkFileColumnFamily.STR_NAME, 
file.insert().getMetadata());
+      mutation.addCondition(c);
+    }
+    return this;
+  }
+
   @Override
   public void submit(Ample.RejectionHandler rejectionCheck) {
     Preconditions.checkState(updatesEnabled, "Cannot make updates after 
calling mutate.");
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index 7b0f494c19..5ff2ca3ec3 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -174,7 +174,6 @@ class LoadFiles extends ManagerRepo {
       }
 
       List<ColumnType> rsc = new ArrayList<>();
-      rsc.add(LOCATION);
       if (setTime) {
         rsc.add(TIME);
       }
@@ -232,7 +231,8 @@ class LoadFiles extends ManagerRepo {
         }
 
         var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent())
-            .requireAbsentOperation().requireSame(tablet, LOADED, 
requireSameCols);
+            .requireAbsentOperation().requireAbsentLoaded(filesToLoad.keySet())
+            .requireSame(tablet, LOCATION, requireSameCols);
 
         if (pauseLimit > 0) {
           tabletMutator.requireLessOrEqualsFiles(pauseLimit);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index c1bc46921a..2cbb819a05 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -1874,4 +1874,65 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
     assertEquals(time4, context.getAmple().readTablet(e1).getTime());
 
   }
+
+  @Test
+  public void testRequireAbsentLoaded() {
+    var context = cluster.getServerContext();
+
+    var stf1 = StoredTabletFile
+        .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
+    var stf2 = StoredTabletFile
+        .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
+    var stf3 = StoredTabletFile
+        .of(new 
Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
+    var dfv = new DataFileValue(100, 100);
+
+    FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+
+    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      ctmi.mutateTablet(e1).requireAbsentOperation()
+          .requireAbsentLoaded(Set.of(stf1.getTabletFile(), 
stf2.getTabletFile()))
+          .putBulkFile(stf1.getTabletFile(), 
fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
+          .putFile(stf1, dfv).putFile(stf2, dfv).submit(tm -> false);
+      assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
+    }
+    assertEquals(Set.of(stf1, stf2), 
context.getAmple().readTablet(e1).getFiles());
+    assertEquals(Map.of(stf1, fateId1, stf2, fateId1),
+        context.getAmple().readTablet(e1).getLoaded());
+
+    FateId fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+
+    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      ctmi.mutateTablet(e1).requireAbsentOperation()
+          .requireAbsentLoaded(Set.of(stf3.getTabletFile()))
+          .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, 
dfv).submit(tm -> false);
+      assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
+    }
+    assertEquals(Set.of(stf1, stf2, stf3), 
context.getAmple().readTablet(e1).getFiles());
+    assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2),
+        context.getAmple().readTablet(e1).getLoaded());
+
+    // should fail because the loaded markers are present
+    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      ctmi.mutateTablet(e1).requireAbsentOperation()
+          .requireAbsentLoaded(Set.of(stf1.getTabletFile(), 
stf2.getTabletFile()))
+          .putBulkFile(stf1.getTabletFile(), 
fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
+          .putFile(stf1, dfv).putFile(stf2, dfv).putFlushId(99).submit(tm -> 
false);
+      assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
+    }
+
+    // should fail because the loaded markers are present
+    try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
+      ctmi.mutateTablet(e1).requireAbsentOperation()
+          .requireAbsentLoaded(Set.of(stf3.getTabletFile()))
+          .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, 
dfv).putFlushId(99)
+          .submit(tm -> false);
+      assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
+    }
+
+    assertEquals(Set.of(stf1, stf2, stf3), 
context.getAmple().readTablet(e1).getFiles());
+    assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2),
+        context.getAmple().readTablet(e1).getLoaded());
+    assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty());
+  }
 }

Reply via email to