This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 8c99aff2d672e3fe0120ca4cfcae0058ec2e7dad Merge: 063d12057a b63afacfa6 Author: Dom Garguilo <domgargu...@apache.org> AuthorDate: Mon Jun 17 15:01:46 2024 -0400 Merge remote-tracking branch 'upstream/main' into elasticity .../util/compaction/RunningCompactionInfo.java | 21 ++--- .../compaction/thrift/TCompactionStatusUpdate.java | 104 ++++++++++++++++++++- core/src/main/thrift/compaction-coordinator.thrift | 1 + .../core/metadata/schema/TabletMetadataTest.java | 27 ++++++ .../accumulo/server/compaction/CompactionInfo.java | 3 +- .../accumulo/server/compaction/FileCompactor.java | 17 +++- .../org/apache/accumulo/compactor/Compactor.java | 37 ++++++-- .../shell/commands/ActiveCompactionHelper.java | 30 +++--- .../shell/commands/ListCompactionsCommand.java | 2 +- .../compaction/ExternalCompactionProgressIT.java | 10 +- .../compaction/ExternalDoNothingCompactor.java | 6 ++ 11 files changed, 203 insertions(+), 55 deletions(-) diff --cc core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index d22925afc3,586f697ae8..9f4ba14def --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@@ -22,20 -22,18 +22,21 @@@ import static java.util.stream.Collecto import static org.apache.accumulo.core.metadata.StoredTabletFile.serialize; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily.MERGED_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily.MERGED_VALUE; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_NONCE_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.AVAILABILITY; + import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_REQUESTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.UNSPLITTABLE; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@@ -431,144 -409,38 +432,170 @@@ public class TabletMetadataTest assertEquals(1, tm2.getScans().size()); assertThrows(UnsupportedOperationException.class, () -> tm2.getScans().add(stf)); assertEquals(1, tm2.getLoaded().size()); - assertThrows(UnsupportedOperationException.class, () -> tm2.getLoaded().put(stf, 0L)); + assertThrows(UnsupportedOperationException.class, + () -> tm2.getLoaded().put(stf, FateId.from(FateInstanceType.USER, UUID.randomUUID()))); assertEquals(1, tm2.getKeyValues().size()); assertThrows(UnsupportedOperationException.class, () -> tm2.getKeyValues().remove(null)); + assertEquals(1, tm2.getCompacted().size()); + assertThrows(UnsupportedOperationException.class, + () -> tm2.getCompacted().add(FateId.from(FateInstanceType.USER, UUID.randomUUID()))); + assertEquals(1, tm2.getUserCompactionsRequested().size()); + assertThrows(UnsupportedOperationException.class, () -> tm2.getUserCompactionsRequested() + .add(FateId.from(FateInstanceType.USER, UUID.randomUUID()))); + } + + @Test + public void testCompactionRequestedColumn() { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + FateInstanceType type = FateInstanceType.fromTableId(extent.tableId()); + FateId userCompactFateId1 = FateId.from(type, UUID.randomUUID()); + FateId userCompactFateId2 = FateId.from(type, UUID.randomUUID()); + + // Test column set + Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); + mutation.put(UserCompactionRequestedColumnFamily.STR_NAME, userCompactFateId1.canonical(), ""); + mutation.put(UserCompactionRequestedColumnFamily.STR_NAME, userCompactFateId2.canonical(), ""); + + TabletMetadata tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(USER_COMPACTION_REQUESTED), true, false); + assertEquals(2, tm.getUserCompactionsRequested().size()); + assertTrue(tm.getUserCompactionsRequested().contains(userCompactFateId1)); + assertTrue(tm.getUserCompactionsRequested().contains(userCompactFateId2)); + + // Column not set + mutation = TabletColumnFamily.createPrevRowMutation(extent); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(USER_COMPACTION_REQUESTED), true, false); + assertTrue(tm.getUserCompactionsRequested().isEmpty()); + + // Column not fetched + mutation = TabletColumnFamily.createPrevRowMutation(extent); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(ColumnType.PREV_ROW), true, false); + assertThrows(IllegalStateException.class, tm::getUserCompactionsRequested); + } + + @Test + public void testUnsplittableColumn() { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + + StoredTabletFile sf1 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")); + StoredTabletFile sf2 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf2.rf")); + StoredTabletFile sf3 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf3.rf")); + // Same path as sf4 but with a range + StoredTabletFile sf4 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf3.rf"), + new Range("a", false, "b", true)); + + // Test with files + var unsplittableMeta1 = + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2, sf3)); + Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); + SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta1.toBase64())); + TabletMetadata tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(UNSPLITTABLE), true, false); + assertUnsplittable(unsplittableMeta1, tm.getUnSplittable(), true); + + // Test empty file set + var unsplittableMeta2 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of()); + mutation = TabletColumnFamily.createPrevRowMutation(extent); + SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta2.toBase64())); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(UNSPLITTABLE), true, false); + assertUnsplittable(unsplittableMeta2, tm.getUnSplittable(), true); + + // Make sure not equals works as well + assertUnsplittable(unsplittableMeta1, unsplittableMeta2, false); + + // Test with ranges + // use sf4 which includes sf4 instead of sf3 which has a range + var unsplittableMeta3 = + UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2, sf4)); + mutation = TabletColumnFamily.createPrevRowMutation(extent); + SplitColumnFamily.UNSPLITTABLE_COLUMN.put(mutation, new Value(unsplittableMeta3.toBase64())); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(UNSPLITTABLE), true, false); + assertUnsplittable(unsplittableMeta3, tm.getUnSplittable(), true); + + // make sure not equals when all the file paths are equal but one has a range + assertUnsplittable(unsplittableMeta1, unsplittableMeta3, false); + + // Column not set + mutation = TabletColumnFamily.createPrevRowMutation(extent); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(UNSPLITTABLE), true, false); + assertNull(tm.getUnSplittable()); + // Column not fetched + mutation = TabletColumnFamily.createPrevRowMutation(extent); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(ColumnType.PREV_ROW), true, false); + assertThrows(IllegalStateException.class, tm::getUnSplittable); + } + + @Test + public void testUnsplittableWithRange() { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + + // Files with same path and different ranges + StoredTabletFile sf1 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")); + StoredTabletFile sf2 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf"), + new Range("a", false, "b", true)); + StoredTabletFile sf3 = StoredTabletFile.of(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf"), + new Range("a", false, "d", true)); + + var meta1 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1)); + var meta2 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf2)); + var meta3 = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf3)); + + // compare each against the others to make sure not equal + assertUnsplittable(meta1, meta2, false); + assertUnsplittable(meta1, meta3, false); + assertUnsplittable(meta2, meta3, false); + } + + private void assertUnsplittable(UnSplittableMetadata meta1, UnSplittableMetadata meta2, + boolean equal) { + assertEquals(equal, meta1.equals(meta2)); + assertEquals(equal, meta1.hashCode() == meta2.hashCode()); + assertEquals(equal, meta1.toBase64().equals(meta2.toBase64())); + } + + @Test + public void testUnknownColFamily() { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); + + mutation.put("1234567890abcdefg", "xyz", "v1"); + assertThrows(IllegalStateException.class, () -> TabletMetadata + .convertRow(toRowMap(mutation).entrySet().iterator(), EnumSet.of(MERGED), true, false)); } + @Test + public void testAbsentPrevRow() { + // If the prev row is fetched, then it is expected to be seen. Ensure that if it was not seen + // that TabletMetadata fails when attempting to use it. Want to ensure null is not returned for + // this case. + Mutation mutation = + new Mutation(MetadataSchema.TabletsSection.encodeRow(TableId.of("5"), new Text("df"))); + DIRECTORY_COLUMN.put(mutation, new Value("d1")); + SortedMap<Key,Value> rowMap = toRowMap(mutation); + + var tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), - EnumSet.allOf(ColumnType.class), false); ++ EnumSet.allOf(ColumnType.class), false, false); + + var msg = assertThrows(IllegalStateException.class, tm::getExtent).getMessage(); + assertTrue(msg.contains("No prev endrow seen")); + msg = assertThrows(IllegalStateException.class, tm::getPrevEndRow).getMessage(); + assertTrue(msg.contains("No prev endrow seen")); + + // should see a slightly different error message when the prev row is not fetched - tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), EnumSet.of(DIR), false); ++ tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), EnumSet.of(DIR), false, false); + msg = assertThrows(IllegalStateException.class, tm::getExtent).getMessage(); + assertTrue(msg.contains("PREV_ROW was not fetched")); + msg = assertThrows(IllegalStateException.class, tm::getPrevEndRow).getMessage(); + assertTrue(msg.contains("PREV_ROW was not fetched")); + } + private SortedMap<Key,Value> toRowMap(Mutation mutation) { SortedMap<Key,Value> rowMap = new TreeMap<>(); mutation.getUpdates().forEach(cu -> { diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 7896096ce1,26a610a65c..a39a6ca1c7 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -98,10 -97,9 +99,11 @@@ import org.apache.accumulo.core.util.Ut import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; + import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.client.ClientServiceHandler; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.compaction.CompactionInfo; import org.apache.accumulo.server.compaction.CompactionWatcher; import org.apache.accumulo.server.compaction.FileCompactor; diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index a797faa36e,a7844c05d0..72c0fbe560 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@@ -19,7 -19,8 +19,8 @@@ package org.apache.accumulo.test.compaction; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions; @@@ -357,15 -440,8 +360,14 @@@ public class ExternalCompactionProgress * Check running compaction progress. */ private void checkRunning() throws TException { - - TExternalCompactionList ecList = getRunningCompactions(getCluster().getServerContext()); + ServerContext ctx = getCluster().getServerContext(); + Optional<HostAndPort> coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(ctx); + if (coordinatorHost.isEmpty()) { + throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); + } + - var ecList = getRunningCompactions(ctx, coordinatorHost); - var ecMap = ecList.getCompactions(); ++ TExternalCompactionList ecList = getRunningCompactions(ctx, coordinatorHost); + Map<String,TExternalCompaction> ecMap = ecList.getCompactions(); if (ecMap != null) { ecMap.forEach((ecid, ec) -> { // returns null if it's a new mapping