This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 3310612bc8 Add new metadata column to prevent user compaction starvation (#4254) 3310612bc8 is described below commit 3310612bc8216b2fca311a189b9e68dce8f1e193 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Mon Feb 26 13:33:14 2024 -0500 Add new metadata column to prevent user compaction starvation (#4254) This commit adds a new column that will be set by user compactions during the fate operation if it is detected that it can't run due to a system compaction. This will prevent future system compactions from running until the user compaction is finished. Co-authored-by: Keith Turner <ktur...@apache.org> --- .../core/manager/state/TabletManagement.java | 8 +- .../accumulo/core/metadata/schema/Ample.java | 4 + .../core/metadata/schema/MetadataSchema.java | 9 ++ .../core/metadata/schema/TabletMetadata.java | 18 +++- .../metadata/schema/TabletMetadataBuilder.java | 13 +++ .../core/metadata/schema/TabletMutatorBase.java | 13 +++ .../core/metadata/schema/TabletsMetadata.java | 4 + .../core/metadata/schema/TabletMetadataTest.java | 43 +++++++- .../server/constraints/MetadataConstraints.java | 10 +- .../manager/state/TabletManagementIterator.java | 2 + .../metadata/ConditionalTabletMutatorImpl.java | 8 ++ .../constraints/MetadataConstraintsTest.java | 16 ++- .../coordinator/CompactionCoordinator.java | 8 +- .../accumulo/manager/tableOps/compact/CleanUp.java | 19 +++- .../manager/tableOps/compact/CompactionDriver.java | 41 +++++-- .../compaction/ExternalCompactionTestUtils.java | 1 + .../test/functional/AmpleConditionalWriterIT.java | 89 +++++++++++++++ .../accumulo/test/functional/CompactionIT.java | 120 +++++++++++++++++++++ 18 files changed, 398 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java index 04d956dd8c..911dfc8deb 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java @@ -42,10 +42,10 @@ import com.google.common.base.Splitter; */ public class TabletManagement { - public static final EnumSet<ColumnType> CONFIGURED_COLUMNS = - EnumSet.of(ColumnType.PREV_ROW, ColumnType.LOCATION, ColumnType.SUSPEND, ColumnType.LOGS, - ColumnType.AVAILABILITY, ColumnType.HOSTING_REQUESTED, ColumnType.FILES, ColumnType.LAST, - ColumnType.OPID, ColumnType.ECOMP, ColumnType.DIR, ColumnType.SELECTED); + public static final EnumSet<ColumnType> CONFIGURED_COLUMNS = EnumSet.of(ColumnType.PREV_ROW, + ColumnType.LOCATION, ColumnType.SUSPEND, ColumnType.LOGS, ColumnType.AVAILABILITY, + ColumnType.HOSTING_REQUESTED, ColumnType.FILES, ColumnType.LAST, ColumnType.OPID, + ColumnType.ECOMP, ColumnType.DIR, ColumnType.SELECTED, ColumnType.USER_COMPACTION_REQUESTED); private static final Text ERROR_COLUMN_NAME = new Text("ERROR"); private static final Text REASONS_COLUMN_NAME = new Text("REASONS"); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 18d54d1f25..d3f27a7f36 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -415,6 +415,10 @@ public interface Ample { T setMerged(); T deleteMerged(); + + T putUserCompactionRequested(FateId fateId); + + T deleteUserCompactionRequested(FateId fateId); } interface TabletMutator extends TabletUpdates<TabletMutator> { diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 852eb257c8..46cbfb7b06 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -421,6 +421,15 @@ public class MetadataSchema { public static final Text NAME = new Text(STR_NAME); } + /** + * Column family for indicating that a user has requested to compact a tablet. The column + * qualifier is expected to contain the fate transaction id that is executing the request. + */ + public static class UserCompactionRequestedColumnFamily { + public static final String STR_NAME = "userRequestToCompact"; + public static final Text NAME = new Text(STR_NAME); + } + // TODO when removing the Upgrader12to13 class in the upgrade package, also remove this class. public static class Upgrade12to13 { diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index 5ad11b5945..ccdb2acaec 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -72,6 +72,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Sc import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -118,6 +119,7 @@ public class TabletMetadata { private TabletOperationId operationId; private boolean futureAndCurrentLocationSet = false; private Set<FateId> compacted; + private Set<FateId> userCompactionsRequested; public static TabletMetadataBuilder builder(KeyExtent extent) { return new TabletMetadataBuilder(extent); @@ -147,7 +149,8 @@ public class TabletMetadata { HOSTING_REQUESTED, OPID, SELECTED, - COMPACTED + COMPACTED, + USER_COMPACTION_REQUESTED } public static class Location { @@ -358,6 +361,11 @@ public class TabletMetadata { return merged; } + public Set<FateId> getUserCompactionsRequested() { + ensureFetched(ColumnType.USER_COMPACTION_REQUESTED); + return userCompactionsRequested; + } + public TabletAvailability getTabletAvailability() { if (AccumuloTable.ROOT.tableId().equals(getTableId()) || AccumuloTable.METADATA.tableId().equals(getTableId())) { @@ -385,7 +393,8 @@ public class TabletMetadata { .append("extCompactions", extCompactions).append("availability", availability) .append("onDemandHostingRequested", onDemandHostingRequested) .append("operationId", operationId).append("selectedFiles", selectedFiles) - .append("futureAndCurrentLocationSet", futureAndCurrentLocationSet).toString(); + .append("futureAndCurrentLocationSet", futureAndCurrentLocationSet) + .append("userCompactionsRequested", userCompactionsRequested).toString(); } public SortedMap<Key,Value> getKeyValues() { @@ -431,6 +440,7 @@ public class TabletMetadata { final var extCompBuilder = ImmutableMap.<ExternalCompactionId,CompactionMetadata>builder(); final var loadedFilesBuilder = ImmutableMap.<StoredTabletFile,FateId>builder(); final var compactedBuilder = ImmutableSet.<FateId>builder(); + final var userCompactionsRequestedBuilder = ImmutableSet.<FateId>builder(); ByteSequence row = null; while (rowIter.hasNext()) { @@ -532,6 +542,9 @@ public class TabletMetadata { case CompactedColumnFamily.STR_NAME: compactedBuilder.add(FateId.from(qual)); break; + case UserCompactionRequestedColumnFamily.STR_NAME: + userCompactionsRequestedBuilder.add(FateId.from(qual)); + break; default: throw new IllegalStateException("Unexpected family " + fam); @@ -551,6 +564,7 @@ public class TabletMetadata { te.logs = logsBuilder.build(); te.extCompactions = extCompBuilder.build(); te.compacted = compactedBuilder.build(); + te.userCompactionsRequested = userCompactionsRequestedBuilder.build(); if (buildKeyValueMap) { te.keyValues = kvBuilder.build(); } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java index 15583d42ae..55a2107599 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java @@ -36,6 +36,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import java.util.Arrays; import java.util.EnumSet; @@ -282,6 +283,18 @@ public class TabletMetadataBuilder implements Ample.TabletUpdates<TabletMetadata throw new UnsupportedOperationException(); } + @Override + public TabletMetadataBuilder putUserCompactionRequested(FateId fateId) { + fetched.add(USER_COMPACTION_REQUESTED); + internalBuilder.putUserCompactionRequested(fateId); + return this; + } + + @Override + public TabletMetadataBuilder deleteUserCompactionRequested(FateId fateId) { + throw new UnsupportedOperationException(); + } + /** * @param extraFetched Anything that was put on the builder will automatically be added to the * fetched set. However, for the case where something was not put and it needs to be diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java index 04dd6baa50..190439ab2d 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java @@ -46,6 +46,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Sc import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -334,6 +335,18 @@ public abstract class TabletMutatorBase<T extends Ample.TabletUpdates<T>> return getThis(); } + @Override + public T putUserCompactionRequested(FateId fateId) { + mutation.put(UserCompactionRequestedColumnFamily.STR_NAME, fateId.canonical(), ""); + return getThis(); + } + + @Override + public T deleteUserCompactionRequested(FateId fateId) { + mutation.putDelete(UserCompactionRequestedColumnFamily.STR_NAME, fateId.canonical()); + return getThis(); + } + public void setCloseAfterMutate(AutoCloseable closeable) { this.closeAfterMutate = closeable; } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java index c47e9973dd..4fee556643 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java @@ -82,6 +82,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; @@ -392,6 +393,9 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable case COMPACTED: families.add(CompactedColumnFamily.NAME); break; + case USER_COMPACTION_REQUESTED: + families.add(UserCompactionRequestedColumnFamily.NAME); + break; default: throw new IllegalArgumentException("Unknown col type " + colToFetch); } diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index 36f2ea06e1..7e927749b4 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -32,6 +32,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -70,6 +71,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.La import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; @@ -127,6 +129,8 @@ public class TabletMetadataTest { mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf2.getMetadata()).put(""); MERGED_COLUMN.put(mutation, new Value()); + mutation.put(UserCompactionRequestedColumnFamily.STR_NAME, FateId.from(type, 17).canonical(), + ""); SortedMap<Key,Value> rowMap = toRowMap(mutation); @@ -157,6 +161,7 @@ public class TabletMetadataTest { assertEquals("M123456789", tm.getTime().encode()); assertEquals(Set.of(sf1, sf2), Set.copyOf(tm.getScans())); assertTrue(tm.hasMerged()); + assertTrue(tm.getUserCompactionsRequested().contains(FateId.from(type, 17))); } @Test @@ -302,6 +307,37 @@ public class TabletMetadataTest { assertThrows(IllegalStateException.class, tm::hasMerged); } + @Test + public void testCompactionRequestedColumn() { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + FateInstanceType type = FateInstanceType.fromTableId(extent.tableId()); + + // Test column set + Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); + mutation.put(UserCompactionRequestedColumnFamily.STR_NAME, FateId.from(type, 17).canonical(), + ""); + mutation.put(UserCompactionRequestedColumnFamily.STR_NAME, FateId.from(type, 18).canonical(), + ""); + + TabletMetadata tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(USER_COMPACTION_REQUESTED), true, false); + assertEquals(2, tm.getUserCompactionsRequested().size()); + assertTrue(tm.getUserCompactionsRequested().contains(FateId.from(type, 17))); + assertTrue(tm.getUserCompactionsRequested().contains(FateId.from(type, 18))); + + // Column not set + mutation = TabletColumnFamily.createPrevRowMutation(extent); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(USER_COMPACTION_REQUESTED), true, false); + assertTrue(tm.getUserCompactionsRequested().isEmpty()); + + // Column not fetched + mutation = TabletColumnFamily.createPrevRowMutation(extent); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(ColumnType.PREV_ROW), true, false); + assertThrows(IllegalStateException.class, tm::getUserCompactionsRequested); + } + @Test public void testUnkownColFamily() { KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); @@ -354,7 +390,7 @@ public class TabletMetadataTest { .putFile(sf1, dfv1).putFile(sf2, dfv2).putBulkFile(rf1, FateId.from(type, 25)) .putBulkFile(rf2, FateId.from(type, 35)).putFlushId(27).putDirName("dir1").putScan(sf3) .putScan(sf4).putCompacted(FateId.from(type, 17)).putCompacted(FateId.from(type, 23)) - .build(ECOMP, HOSTING_REQUESTED, MERGED); + .build(ECOMP, HOSTING_REQUESTED, MERGED, USER_COMPACTION_REQUESTED); assertEquals(extent, tm.getExtent()); assertEquals(TabletAvailability.UNHOSTED, tm.getTabletAvailability()); @@ -368,6 +404,7 @@ public class TabletMetadataTest { assertEquals(Set.of(), tm.getExternalCompactions().keySet()); assertEquals(Set.of(FateId.from(type, 17L), FateId.from(type, 23L)), tm.getCompacted()); assertFalse(tm.getHostingRequested()); + assertTrue(tm.getUserCompactionsRequested().isEmpty()); assertFalse(tm.hasMerged()); assertThrows(IllegalStateException.class, tm::getOperationId); assertThrows(IllegalStateException.class, tm::getSuspend); @@ -405,7 +442,8 @@ public class TabletMetadataTest { TabletMetadata tm3 = TabletMetadata.builder(extent).putExternalCompaction(ecid1, ecm) .putSuspension(ser1, 45L).putTime(new MetadataTime(479, TimeType.LOGICAL)).putWal(le1) - .putWal(le2).setHostingRequested().putSelectedFiles(selFiles).setMerged().build(); + .putWal(le2).setHostingRequested().putSelectedFiles(selFiles).setMerged() + .putUserCompactionRequested(FateId.from(type, 159L)).build(); assertEquals(Set.of(ecid1), tm3.getExternalCompactions().keySet()); assertEquals(Set.of(sf1, sf2), tm3.getExternalCompactions().get(ecid1).getJobFiles()); @@ -420,6 +458,7 @@ public class TabletMetadataTest { assertFalse(tm3.getSelectedFiles().initiallySelectedAll()); assertEquals(selFiles.getMetadataValue(), tm3.getSelectedFiles().getMetadataValue()); assertTrue(tm3.hasMerged()); + assertTrue(tm3.getUserCompactionsRequested().contains(FateId.from(type, 159L))); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index 90eb3b6f0a..abb1c7a27d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -55,6 +55,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Se import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Upgrade12to13; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.util.ColumnFQ; @@ -111,7 +112,8 @@ public class MetadataConstraints implements Constraint { ExternalCompactionColumnFamily.NAME, CompactedColumnFamily.NAME, CHOPPED, - MergedColumnFamily.NAME + MergedColumnFamily.NAME, + UserCompactionRequestedColumnFamily.NAME ); // @formatter:on @@ -229,7 +231,8 @@ public class MetadataConstraints implements Constraint { if (columnUpdate.getValue().length == 0 && !(columnFamily.equals(ScanFileColumnFamily.NAME) || columnFamily.equals(LogColumnFamily.NAME) || TabletColumnFamily.REQUESTED_COLUMN.equals(columnFamily, columnQualifier) - || columnFamily.equals(CompactedColumnFamily.NAME))) { + || columnFamily.equals(CompactedColumnFamily.NAME) + || columnFamily.equals(UserCompactionRequestedColumnFamily.NAME))) { violations = addViolation(violations, 6); } @@ -267,7 +270,8 @@ public class MetadataConstraints implements Constraint { } catch (RuntimeException e) { violations = addViolation(violations, 11); } - } else if (CompactedColumnFamily.NAME.equals(columnFamily)) { + } else if (CompactedColumnFamily.NAME.equals(columnFamily) + || UserCompactionRequestedColumnFamily.NAME.equals(columnFamily)) { if (!FateId.isFateId(columnQualifier.toString())) { violations = addViolation(violations, 13); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 69fbbdd1b0..cd264d7654 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -54,6 +54,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer; @@ -137,6 +138,7 @@ public class TabletManagementIterator extends SkippingIterator { scanner.fetchColumnFamily(DataFileColumnFamily.NAME); scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); ServerColumnFamily.OPID_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(UserCompactionRequestedColumnFamily.NAME); scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class)); IteratorSetting tabletChange = new IteratorSetting(1001, "ManagerTabletInfoIterator", TabletManagementIterator.class); diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 34ecdeeebd..6003fd73e6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -47,6 +47,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Co import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; @@ -231,6 +232,13 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit mutation.addCondition(c); } break; + case USER_COMPACTION_REQUESTED: { + Condition c = + SetEqualityIterator.createCondition(tabletMetadata.getUserCompactionsRequested(), + fTid -> fTid.canonical().getBytes(UTF_8), UserCompactionRequestedColumnFamily.NAME); + mutation.addCondition(c); + } + break; default: throw new UnsupportedOperationException("Column type " + type + " is not supported."); } diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java index 5360a54e95..89e22a09de 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Da import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily; import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.server.ServerContext; import org.apache.hadoop.fs.Path; @@ -492,18 +493,29 @@ public class MetadataConstraintsTest { @Test public void testCompacted() { + testFateCqValidation(CompactedColumnFamily.STR_NAME); + } + + @Test + public void testUserCompactionRequested() { + testFateCqValidation(UserCompactionRequestedColumnFamily.STR_NAME); + } + + // Verify that columns that store a FateId in their CQ + // validate and only allow a correctly formatted FateId + private void testFateCqValidation(String column) { MetadataConstraints mc = new MetadataConstraints(); Mutation m; List<Short> violations; FateId fateId = FateId.from(FateInstanceType.META, 45L); m = new Mutation(new Text("0;foo")); - m.put(CompactedColumnFamily.STR_NAME, fateId.canonical(), ""); + m.put(column, fateId.canonical(), ""); violations = mc.check(createEnv(), m); assertNull(violations); m = new Mutation(new Text("0;foo")); - m.put(CompactedColumnFamily.STR_NAME, "incorrect data", ""); + m.put(column, "incorrect data", ""); violations = mc.check(createEnv(), m); assertNotNull(violations); assertEquals(1, violations.size()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 3bf393d1eb..8d54fe376c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -409,7 +409,13 @@ public class CompactionCoordinator switch (job.getKind()) { case SYSTEM: - if (tablet.getSelectedFiles() != null + var userRequestedCompactions = tablet.getUserCompactionsRequested().size(); + if (userRequestedCompactions > 0) { + LOG.debug( + "Unable to reserve {} for system compaction, tablet has {} pending requested user compactions", + tablet.getExtent(), userRequestedCompactions); + return false; + } else if (tablet.getSelectedFiles() != null && !Collections.disjoint(jobFiles, tablet.getSelectedFiles().getFiles())) { return false; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java index b74875fda6..ce2a5f1ab3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java @@ -20,6 +20,7 @@ package org.apache.accumulo.manager.tableOps.compact; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import java.time.Duration; import java.util.concurrent.atomic.AtomicLong; @@ -74,16 +75,24 @@ public class CleanUp extends ManagerRepo { try ( var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) - .fetch(PREV_ROW, COMPACTED).checkConsistency().build(); + .fetch(PREV_ROW, COMPACTED, USER_COMPACTION_REQUESTED).checkConsistency().build(); var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { t1 = System.nanoTime(); for (TabletMetadata tablet : tablets) { total++; - if (tablet.getCompacted().contains(fateId)) { - tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() - .requireSame(tablet, COMPACTED).deleteCompacted(fateId) - .submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(fateId)); + if (tablet.getCompacted().contains(fateId) + || tablet.getUserCompactionsRequested().contains(fateId)) { + var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() + .requireSame(tablet, COMPACTED, USER_COMPACTION_REQUESTED); + if (tablet.getCompacted().contains(fateId)) { + mutator.deleteCompacted(fateId); + } + if (tablet.getUserCompactionsRequested().contains(fateId)) { + mutator.deleteUserCompactionRequested(fateId); + } + mutator.submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(fateId) + && !tabletMetadata.getUserCompactionsRequested().contains(fateId)); submitted++; } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 3e5c17b650..4f61047749 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -27,6 +27,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -155,7 +156,8 @@ class CompactionDriver extends ManagerRepo { int noneSelected = 0; int alreadySelected = 0; int otherSelected = 0; - int otherCompaction = 0; + int userCompactionRequested = 0; + int userCompactionWaiting = 0; int selected = 0; KeyExtent minSelected = null; @@ -163,7 +165,8 @@ class CompactionDriver extends ManagerRepo { try ( var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) - .fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID).checkConsistency().build(); + .fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID, USER_COMPACTION_REQUESTED) + .checkConsistency().build(); var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { CompactionConfig config = CompactionConfigStorage.getConfig(manager.getContext(), fateId); @@ -256,10 +259,24 @@ class CompactionDriver extends ManagerRepo { tablet.getSelectedFiles().getFateId()); otherSelected++; } - } else { - // ELASTICITY_TODO if there are compactions preventing selection of files, then add + } else if (!tablet.getExternalCompactions().isEmpty()) { + // If there are compactions preventing selection of files, then add // selecting marker that prevents new compactions from starting - otherCompaction++; + if (!tablet.getUserCompactionsRequested().contains(fateId)) { + log.debug( + "Another compaction exists for {}, Marking {} as needing a user requested compaction", + tablet.getExtent(), fateId); + var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() + .requireSame(tablet, ECOMP, USER_COMPACTION_REQUESTED) + .putUserCompactionRequested(fateId); + mutator.submit(tm -> tm.getUserCompactionsRequested().contains(fateId)); + userCompactionRequested++; + } else { + // Marker was already added and we are waiting + log.debug("Waiting on {} for previously marked user requested compaction {} to run", + tablet.getExtent(), fateId); + userCompactionWaiting++; + } } } } catch (InterruptedException | KeeperException e) { @@ -268,10 +285,12 @@ class CompactionDriver extends ManagerRepo { long t2 = System.currentTimeMillis(); - log.debug("{} tablet stats, total:{} complete:{} selected_now:{} selected_prev:{}" - + " selected_by_other:{} no_files:{} none_selected:{} other_compaction:{} opids:{} scan_update_time:{}ms", + log.debug( + "{} tablet stats, total:{} complete:{} selected_now:{} selected_prev:{} selected_by_other:{} " + + "no_files:{} none_selected:{} user_compaction_requested:{} user_compaction_waiting:{} " + + "opids:{} scan_update_time:{}ms", fateId, total, complete, selected, alreadySelected, otherSelected, noFiles, noneSelected, - otherCompaction, opidsSeen, t2 - t1); + userCompactionRequested, userCompactionWaiting, opidsSeen, t2 - t1); if (selected > 0) { manager.getEventCoordinator().event( @@ -330,7 +349,8 @@ class CompactionDriver extends ManagerRepo { Predicate<TabletMetadata> needsUpdate = tabletMetadata -> (tabletMetadata.getSelectedFiles() != null && tabletMetadata.getSelectedFiles().getFateId().equals(fateId)) - || tabletMetadata.getCompacted().contains(fateId); + || tabletMetadata.getCompacted().contains(fateId) + || tabletMetadata.getUserCompactionsRequested().contains(fateId); Predicate<TabletMetadata> needsNoUpdate = needsUpdate.negate(); for (TabletMetadata tablet : tablets) { @@ -346,6 +366,9 @@ class CompactionDriver extends ManagerRepo { if (tablet.getCompacted().contains(fateId)) { mutator.deleteCompacted(fateId); } + if (tablet.getUserCompactionsRequested().contains(fateId)) { + mutator.deleteUserCompactionRequested(fateId); + } mutator.submit(needsNoUpdate::test); } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index b91dee97cf..d36b70fad5 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -377,6 +377,7 @@ public class ExternalCompactionTestUtils { assertEquals(Set.of(), tabletMetadata.getCompacted()); assertNull(tabletMetadata.getSelectedFiles()); assertEquals(Set.of(), tabletMetadata.getExternalCompactions().keySet()); + assertEquals(Set.of(), tabletMetadata.getUserCompactionsRequested()); count++; } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index 2b3a8972f1..fa35d5fc21 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -31,6 +31,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import static org.apache.accumulo.core.util.LazySingletons.GSON; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -895,6 +896,94 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { } } + @Test + public void testUserCompactionRequested() { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var context = cluster.getServerContext(); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + + FateInstanceType type = FateInstanceType.fromTableId(tid); + FateId fateId45L = FateId.from(type, 45L); + FateId fateId55L = FateId.from(type, 55L); + FateId fateId56L = FateId.from(type, 56L); + FateId fateId65L = FateId.from(type, 65L); + FateId fateId75L = FateId.from(type, 75L); + + var tabletMeta1 = TabletMetadata.builder(e1).build(USER_COMPACTION_REQUESTED); + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireSame(tabletMeta1, USER_COMPACTION_REQUESTED).putUserCompactionRequested(fateId55L) + .submit( + tabletMetadata -> tabletMetadata.getUserCompactionsRequested().contains(fateId55L)); + var tabletMeta2 = TabletMetadata.builder(e2).putUserCompactionRequested(fateId45L) + .build(USER_COMPACTION_REQUESTED); + ctmi.mutateTablet(e2).requireAbsentOperation() + .requireSame(tabletMeta2, USER_COMPACTION_REQUESTED).putUserCompactionRequested(fateId56L) + .submit( + tabletMetadata -> tabletMetadata.getUserCompactionsRequested().contains(fateId56L)); + + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Status.REJECTED, results.get(e2).getStatus()); + + tabletMeta1 = context.getAmple().readTablet(e1); + assertEquals(Set.of(fateId55L), tabletMeta1.getUserCompactionsRequested()); + assertEquals(Set.of(), context.getAmple().readTablet(e2).getUserCompactionsRequested()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireSame(tabletMeta1, USER_COMPACTION_REQUESTED).putUserCompactionRequested(fateId65L) + .putUserCompactionRequested(fateId75L).submit(tabletMetadata -> false); + + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + tabletMeta1 = context.getAmple().readTablet(e1); + assertEquals(Set.of(fateId55L, fateId65L, fateId75L), + tabletMeta1.getUserCompactionsRequested()); + + // test require same with a superset + ctmi = new ConditionalTabletsMutatorImpl(context); + tabletMeta1 = TabletMetadata.builder(e2).putUserCompactionRequested(fateId55L) + .putUserCompactionRequested(fateId65L).putUserCompactionRequested(fateId75L) + .putUserCompactionRequested(fateId45L).build(USER_COMPACTION_REQUESTED); + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireSame(tabletMeta1, USER_COMPACTION_REQUESTED) + .deleteUserCompactionRequested(fateId55L).deleteUserCompactionRequested(fateId65L) + .deleteUserCompactionRequested(fateId75L).submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(Set.of(fateId55L, fateId65L, fateId75L), + context.getAmple().readTablet(e1).getUserCompactionsRequested()); + + // test require same with a subset + ctmi = new ConditionalTabletsMutatorImpl(context); + tabletMeta1 = TabletMetadata.builder(e2).putUserCompactionRequested(fateId55L) + .putUserCompactionRequested(fateId65L).build(USER_COMPACTION_REQUESTED); + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireSame(tabletMeta1, USER_COMPACTION_REQUESTED) + .deleteUserCompactionRequested(fateId55L).deleteUserCompactionRequested(fateId65L) + .deleteUserCompactionRequested(fateId75L).submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(Set.of(fateId55L, fateId65L, fateId75L), + context.getAmple().readTablet(e1).getUserCompactionsRequested()); + + // now use the exact set the tablet has + ctmi = new ConditionalTabletsMutatorImpl(context); + tabletMeta1 = TabletMetadata.builder(e2).putUserCompactionRequested(fateId55L) + .putUserCompactionRequested(fateId65L).putUserCompactionRequested(fateId75L) + .build(USER_COMPACTION_REQUESTED); + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireSame(tabletMeta1, USER_COMPACTION_REQUESTED) + .deleteUserCompactionRequested(fateId55L).deleteUserCompactionRequested(fateId65L) + .deleteUserCompactionRequested(fateId75L).submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Set.of(), context.getAmple().readTablet(e1).getUserCompactionsRequested()); + } + } + @Nested public class TestFilter { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index a310b1e08c..4a06574cfc 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -46,6 +46,7 @@ import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.IntStream; @@ -76,6 +77,7 @@ import org.apache.accumulo.core.iterators.user.GrepIterator; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; @@ -83,6 +85,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; import org.apache.accumulo.harness.AccumuloClusterHarness; @@ -909,6 +912,123 @@ public class CompactionIT extends AccumuloClusterHarness { } } + @Test + public void testUserCompactionRequested() throws Exception { + + String tableName = getUniqueNames(1)[0]; + try (final AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + // configure tablet compaction iterator that slows compaction down so we can test + // that the USER_COMPACTION_REQUESTED column is set when a user compaction is requested + // when a system compaction is running and blocking + + var ntc = new NewTableConfiguration(); + IteratorSetting iterSetting = new IteratorSetting(50, SlowIterator.class); + SlowIterator.setSleepTime(iterSetting, 1); + ntc.attachIterator(iterSetting, EnumSet.of(IteratorScope.majc)); + ntc.setProperties(Map.of(Property.TABLE_MAJC_RATIO.getKey(), "20")); + client.tableOperations().create(tableName, ntc); + + // Insert MAX_DATA rows + writeRows((ClientContext) client, tableName, MAX_DATA, false); + + // set the compaction ratio 1 to trigger a system compaction + client.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1"); + + var tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + var extent = new KeyExtent(tableId, null, null); + + AtomicReference<ExternalCompactionId> initialCompaction = new AtomicReference<>(); + + // Wait for the system compaction to start + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions(); + log.debug("Current external compactions {}", externalCompactions.size()); + var current = externalCompactions.keySet().stream().findFirst(); + current.ifPresent(initialCompaction::set); + return current.isPresent(); + }, Wait.MAX_WAIT_MILLIS, 100); + + // Trigger a user compaction which should be blocked by the system compaction + // and should result in the userRequestedCompactions column being set so no more + // system compactions run + client.tableOperations().compact(tableName, new CompactionConfig().setWait(false)); + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var userRequestedCompactions = tabletMeta.getUserCompactionsRequested().size(); + log.debug("Current user requested compaction markers {}", userRequestedCompactions); + return userRequestedCompactions == 1; + }, Wait.MAX_WAIT_MILLIS, 100); + + // Send more data to trigger another system compaction but the user compaction + // should go next and the column marker should block it + writeRows((ClientContext) client, tableName, MAX_DATA, false); + + // Verify that when the next compaction starts it is a USER compaction as the + // SYSTEM compaction should be blocked by the marker + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + log.debug("Waiting for USER compaction to start {}", extent); + + var userRequestedCompactions = tabletMeta.getUserCompactionsRequested().size(); + log.debug("Current user requested compaction markers {}", userRequestedCompactions); + var externalCompactions = tabletMeta.getExternalCompactions(); + log.debug("External compactions size {}", externalCompactions.size()); + var current = externalCompactions.entrySet().stream().findFirst(); + current.ifPresent(c -> log.debug("Current running compaction {}", c.getKey())); + + if (current.isPresent()) { + var currentCompaction = current.orElseThrow(); + // Next compaction started - verify it is a USER compaction and not SYSTEM + if (!current.orElseThrow().getKey().equals(initialCompaction.get())) { + log.debug("Next compaction {} started as type {}", currentCompaction.getKey(), + currentCompaction.getValue().getKind()); + assertEquals(CompactionKind.USER, currentCompaction.getValue().getKind()); + return true; + } + } + return false; + }, Wait.MAX_WAIT_MILLIS, 100); + + // Wait for the user compaction to complete and clear the compactions requested column + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var userRequestedCompactions = tabletMeta.getUserCompactionsRequested().size(); + log.debug("Current user requested compaction markers {}", userRequestedCompactions); + return userRequestedCompactions == 0; + }, Wait.MAX_WAIT_MILLIS, 100); + + // Wait and verify all compactions finish + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions().size(); + log.debug("Current external compactions {}", externalCompactions); + return externalCompactions == 0; + }, Wait.MAX_WAIT_MILLIS, 100); + } + + ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), tableName); + } + + private void writeRows(ClientContext client, String tableName, int rows, boolean wait) + throws Exception { + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < rows; i++) { + Mutation m = new Mutation(String.format("r:%04d", i)); + m.put("", "", "" + i); + bw.addMutation(m); + + if (i % 75 == 0) { + // create many files as this will cause a system compaction + bw.flush(); + client.tableOperations().flush(tableName, null, null, wait); + } + } + } + client.tableOperations().flush(tableName, null, null, wait); + } + /** * Counts the number of tablets and files in a table. */