This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 802b4d689c narrows check of loaded files in conditional mutation (#5166) 802b4d689c is described below commit 802b4d689c32db84edd5370f9ea836d7b9e7c1a8 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Dec 12 13:25:31 2024 -0500 narrows check of loaded files in conditional mutation (#5166) Bulk load fate code was reading a tablets loaded flags, checking it was not in them, and then making a conditional mutation that required the set of bulk flags to to be the same. Requiring the set to be the same caused unnecessary collisions between bulk imports. Modified the conditional check to require the loaded flags for the fate operation to be absent. --- .../accumulo/core/metadata/schema/Ample.java | 5 ++ .../metadata/ConditionalTabletMutatorImpl.java | 11 ++++ .../manager/tableOps/bulkVer2/LoadFiles.java | 4 +- .../test/functional/AmpleConditionalWriterIT.java | 61 ++++++++++++++++++++++ 4 files changed, 79 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 699ee319a1..b29b879d89 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -525,6 +525,11 @@ public interface Ample { */ ConditionalTabletMutator requireLessOrEqualsFiles(long limit); + /** + * Requires that a tablet not have these loaded flags set. + */ + ConditionalTabletMutator requireAbsentLoaded(Set<ReferencedTabletFile> files); + /** * <p> * Ample provides the following features on top of the conditional writer to help automate diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 8cee60d96d..ff54603a82 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -45,6 +45,7 @@ import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iterators.SortedFilesIterator; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator; @@ -353,6 +354,16 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit return this; } + @Override + public ConditionalTabletMutator requireAbsentLoaded(Set<ReferencedTabletFile> files) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + for (ReferencedTabletFile file : files) { + Condition c = new Condition(BulkFileColumnFamily.STR_NAME, file.insert().getMetadata()); + mutation.addCondition(c); + } + return this; + } + @Override public void submit(Ample.RejectionHandler rejectionCheck) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 7b0f494c19..5ff2ca3ec3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -174,7 +174,6 @@ class LoadFiles extends ManagerRepo { } List<ColumnType> rsc = new ArrayList<>(); - rsc.add(LOCATION); if (setTime) { rsc.add(TIME); } @@ -232,7 +231,8 @@ class LoadFiles extends ManagerRepo { } var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent()) - .requireAbsentOperation().requireSame(tablet, LOADED, requireSameCols); + .requireAbsentOperation().requireAbsentLoaded(filesToLoad.keySet()) + .requireSame(tablet, LOCATION, requireSameCols); if (pauseLimit > 0) { tabletMutator.requireLessOrEqualsFiles(pauseLimit); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index c1bc46921a..2cbb819a05 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -1874,4 +1874,65 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { assertEquals(time4, context.getAmple().readTablet(e1).getTime()); } + + @Test + public void testRequireAbsentLoaded() { + var context = cluster.getServerContext(); + + var stf1 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")); + var stf2 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf")); + var stf3 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")); + var dfv = new DataFileValue(100, 100); + + FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf1.getTabletFile(), stf2.getTabletFile())) + .putBulkFile(stf1.getTabletFile(), fateId1).putBulkFile(stf2.getTabletFile(), fateId1) + .putFile(stf1, dfv).putFile(stf2, dfv).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(Set.of(stf1, stf2), context.getAmple().readTablet(e1).getFiles()); + assertEquals(Map.of(stf1, fateId1, stf2, fateId1), + context.getAmple().readTablet(e1).getLoaded()); + + FateId fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf3.getTabletFile())) + .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, dfv).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2), + context.getAmple().readTablet(e1).getLoaded()); + + // should fail because the loaded markers are present + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf1.getTabletFile(), stf2.getTabletFile())) + .putBulkFile(stf1.getTabletFile(), fateId1).putBulkFile(stf2.getTabletFile(), fateId1) + .putFile(stf1, dfv).putFile(stf2, dfv).putFlushId(99).submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + } + + // should fail because the loaded markers are present + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf3.getTabletFile())) + .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, dfv).putFlushId(99) + .submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + } + + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2), + context.getAmple().readTablet(e1).getLoaded()); + assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty()); + } }