This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 987168fc6a Deduplicate compaction tests (#3665) 987168fc6a is described below commit 987168fc6a935e0f83acb7a806694db94e5906ed Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Jul 28 08:05:41 2023 -0400 Deduplicate compaction tests (#3665) --- .../test/compaction/ExternalCompaction4_IT.java | 7 +- .../compaction/ExternalCompactionTestUtils.java | 31 ++ .../test/compaction/ExternalCompaction_1_IT.java | 97 ++++- .../test/compaction/ExternalCompaction_2_IT.java | 52 ++- .../accumulo/test/functional/CompactionIT.java | 447 +-------------------- 5 files changed, 183 insertions(+), 451 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java index 3543f97810..140f746b99 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java @@ -95,6 +95,9 @@ public class ExternalCompaction4_IT extends AccumuloClusterHarness { client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.getKey(), "1001"); TableId tid = TableId.of(client.tableOperations().tableIdMap().get(table1)); + // In addition to testing errors in compactions, this test also exercises creating lots of + // files to compact. The following will create 1000 files to compact. When changing this test + // try to keep both or create a new test for lots of files to compact. ReadWriteIT.ingest(client, 1000, 1, 1, 0, "colf", table1, 1); Ample ample = ((ClientContext) client).getAmple(); @@ -107,9 +110,11 @@ public class ExternalCompaction4_IT extends AccumuloClusterHarness { client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); - tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build(); + tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES, ColumnType.ECOMP).build(); tm = tms.iterator().next(); assertEquals(1, tm.getFiles().size()); + // ensure the failed compactions did not leave anything in the metadata table + assertEquals(0, tm.getExternalCompactions().size()); ReadWriteIT.verify(client, 1000, 1, 1, 0, table1); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index c9b0108278..6b014c0f21 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -20,6 +20,8 @@ package org.apache.accumulo.test.compaction; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.Collections; @@ -31,6 +33,7 @@ import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloClient; @@ -54,6 +57,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.rpc.ThriftUtil; @@ -227,8 +231,11 @@ public class ExternalCompactionTestUtils { cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, "5s"); cfg.setProperty(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, "3s"); cfg.setProperty(Property.COMPACTOR_PORTSEARCH, "true"); + cfg.setProperty(Property.COMPACTOR_MIN_JOB_WAIT_TIME, "100ms"); + cfg.setProperty(Property.COMPACTOR_MAX_JOB_WAIT_TIME, "1s"); cfg.setProperty(Property.GENERAL_THREADPOOL_SIZE, "10"); cfg.setProperty(Property.MANAGER_FATE_THREADPOOL_SIZE, "10"); + cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "1s"); // use raw local file system so walogs sync and flush will work coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } @@ -281,6 +288,14 @@ public class ExternalCompactionTestUtils { return ecids; } + public static long countTablets(ServerContext ctx, String tableName, + Predicate<TabletMetadata> tabletTest) { + var tableId = TableId.of(ctx.tableOperations().tableIdMap().get(tableName)); + try (var tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId).build()) { + return tabletsMetadata.stream().filter(tabletTest).count(); + } + } + public static void waitForRunningCompactions(ServerContext ctx, TableId tid, Set<ExternalCompactionId> idsToWaitFor) throws Exception { @@ -350,4 +365,20 @@ public class ExternalCompactionTestUtils { } } + + public static void assertNoCompactionMetadata(ServerContext ctx, String tableName) { + var tableId = TableId.of(ctx.tableOperations().tableIdMap().get(tableName)); + var tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId).build(); + + int count = 0; + + for (var tabletMetadata : tabletsMetadata) { + assertEquals(Set.of(), tabletMetadata.getCompacted()); + assertNull(tabletMetadata.getSelectedFiles()); + assertEquals(Set.of(), tabletMetadata.getExternalCompactions().keySet()); + count++; + } + + assertTrue(count > 0); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 5085c310b4..af13e5e576 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -25,6 +25,7 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GR import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP6; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP8; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.assertNoCompactionMetadata; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; @@ -32,9 +33,11 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.ve import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.List; @@ -47,6 +50,7 @@ import java.util.stream.Collectors; import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; @@ -71,6 +75,7 @@ import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.CompactionIT.ErrorThrowingSelector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.BeforeAll; @@ -78,7 +83,6 @@ import org.junit.jupiter.api.Test; import com.google.common.base.Preconditions; -// ELASTICITY_TODO now that there are only external compactions, could merge some of these ITs that are redundant w/ CompactionIT public class ExternalCompaction_1_IT extends SharedMiniClusterBase { public static class ExternalCompaction1Config implements MiniClusterConfigurationCallback { @@ -135,6 +139,39 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { } + @Test + public void testBadSelector() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + final String tableName = getUniqueNames(1)[0]; + NewTableConfiguration tc = new NewTableConfiguration(); + // Ensure compactions don't kick off + tc.setProperties(Map.of(Property.TABLE_MAJC_RATIO.getKey(), "10.0")); + c.tableOperations().create(tableName, tc); + // Create multiple RFiles + try (BatchWriter bw = c.createBatchWriter(tableName)) { + for (int i = 1; i <= 4; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put("cf", "cq", new Value()); + bw.addMutation(m); + bw.flush(); + // flush often to create multiple files to compact + c.tableOperations().flush(tableName, null, null, true); + } + } + + CompactionConfig config = new CompactionConfig() + .setSelector(new PluginConfig(ErrorThrowingSelector.class.getName(), Map.of())) + .setWait(true); + assertThrows(AccumuloException.class, () -> c.tableOperations().compact(tableName, config)); + + List<String> rows = new ArrayList<>(); + c.createScanner(tableName).forEach((k, v) -> rows.add(k.getRow().toString())); + assertEquals(List.of("1", "2", "3", "4"), rows); + + assertNoCompactionMetadata(getCluster().getServerContext(), tableName); + } + } + @Test public void testExternalCompaction() throws Exception { String[] names = this.getUniqueNames(2); @@ -264,6 +301,7 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { .setConfigurer(new PluginConfig(CompressionConfigurer.class.getName(), Map.of(CompressionConfigurer.LARGE_FILE_COMPRESSION_TYPE, "gz", CompressionConfigurer.LARGE_FILE_COMPRESSION_THRESHOLD, data.length + "")))); + assertNoCompactionMetadata(getCluster().getServerContext(), tableName); // after compacting with compression, expect small file sizes = CompactionExecutorIT.getFileSizes(client, tableName); @@ -271,6 +309,7 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { "Unexpected files sizes: data: " + data.length + ", file:" + sizes); client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + assertNoCompactionMetadata(getCluster().getServerContext(), tableName); // after compacting without compression, expect big files again sizes = CompactionExecutorIT.getFileSizes(client, tableName); @@ -284,6 +323,59 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { } } + @Test + public void testConfigurerSetOnTable() throws Exception { + String tableName = this.getUniqueNames(1)[0]; + + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + byte[] data = new byte[100000]; + + Map<String,String> props = Map.of("table.compaction.dispatcher", + SimpleCompactionDispatcher.class.getName(), "table.compaction.dispatcher.opts.service", + "cs5", Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none", + Property.TABLE_COMPACTION_CONFIGURER.getKey(), CompressionConfigurer.class.getName(), + Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey() + + CompressionConfigurer.LARGE_FILE_COMPRESSION_TYPE, + "gz", Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey() + + CompressionConfigurer.LARGE_FILE_COMPRESSION_THRESHOLD, + "" + data.length); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); + client.tableOperations().create(tableName, ntc); + + Arrays.fill(data, (byte) 65); + try (var writer = client.createBatchWriter(tableName)) { + for (int row = 0; row < 10; row++) { + Mutation m = new Mutation(row + ""); + m.at().family("big").qualifier("stuff").put(data); + writer.addMutation(m); + } + } + client.tableOperations().flush(tableName, null, null, true); + + // without compression, expect file to be large + long sizes = CompactionExecutorIT.getFileSizes(client, tableName); + assertTrue(sizes > data.length * 10 && sizes < data.length * 11, + "Unexpected files sizes : " + sizes); + + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + assertNoCompactionMetadata(getCluster().getServerContext(), tableName); + + // after compacting with compression, expect small file + sizes = CompactionExecutorIT.getFileSizes(client, tableName); + assertTrue(sizes < data.length, + "Unexpected files sizes: data: " + data.length + ", file:" + sizes); + + assertNoCompactionMetadata(getCluster().getServerContext(), tableName); + + // We need to cancel the compaction or delete the table here because we initiate a user + // compaction above in the test. Even though the external compaction was cancelled + // because we split the table, FaTE will continue to queue up a compaction + client.tableOperations().cancelCompaction(tableName); + } + } + public static class ExtDevNull extends DevNull { @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, @@ -318,6 +410,8 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { assertFalse(s.iterator().hasNext()); } + assertNoCompactionMetadata(getCluster().getServerContext(), table1); + // We need to cancel the compaction or delete the table here because we initiate a user // compaction above in the test. Even though the external compaction was cancelled // because we split the table, FaTE will continue to queue up a compaction @@ -372,6 +466,7 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { CompactionConfig config = new CompactionConfig().setIterators(List.of(iterSetting)) .setWait(true).setSelector(new PluginConfig(FSelector.class.getName())); client.tableOperations().compact(tableName, config); + assertNoCompactionMetadata(getCluster().getServerContext(), tableName); try (Scanner scanner = client.createScanner(tableName)) { int count = 0; diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java index 4d1cf1b730..f532451c18 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java @@ -25,11 +25,13 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MA import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionRunning; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.countTablets; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -46,10 +48,13 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.compaction.thrift.TCompactionState; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; @@ -138,7 +143,16 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { createTable(client, table1, "cs3"); TableId tid = getCluster().getServerContext().getTableId(table1); writeData(client, table1); - compact(client, table1, 2, GROUP3, false); + + AtomicReference<Throwable> error = new AtomicReference<>(); + Thread t = new Thread(() -> { + try { + compact(client, table1, 2, GROUP3, true); + } catch (AccumuloSecurityException | TableNotFoundException | AccumuloException e) { + error.set(e); + } + }); + t.start(); // Wait for the compaction to start by waiting for 1 external compaction column Set<ExternalCompactionId> ecids = ExternalCompactionTestUtils @@ -149,15 +163,28 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { .confirmCompactionRunning(getCluster().getServerContext(), ecids); assertTrue(matches > 0); + // when the compaction starts it will create a selected files column in the tablet, wait for + // that to happen + while (countTablets(getCluster().getServerContext(), table1, + tm -> tm.getSelectedFiles() != null) == 0) { + Thread.sleep(1000); + } + client.tableOperations().cancelCompaction(table1); + t.join(); + Throwable e = error.get(); + assertNotNull(e); + assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, e.getMessage()); + confirmCompactionCompleted(getCluster().getServerContext(), ecids, TCompactionState.CANCELLED); - // We need to cancel the compaction or delete the table here because we initiate a user - // compaction above in the test. Even though the external compaction was cancelled - // because we split the table, FaTE will continue to queue up a compaction - client.tableOperations().cancelCompaction(table1); + // ensure the canceled compaction deletes any tablet metadata related to the compaction + while (countTablets(getCluster().getServerContext(), table1, + tm -> tm.getSelectedFiles() != null || !tm.getCompacted().isEmpty()) > 0) { + Thread.sleep(1000); + } } } @@ -181,11 +208,26 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { int matches = confirmCompactionRunning(getCluster().getServerContext(), ecids); assertTrue(matches > 0); + // when the compaction starts it will create a selected files column in the tablet, wait for + // that to happen + while (countTablets(getCluster().getServerContext(), table1, + tm -> tm.getSelectedFiles() != null) == 0) { + Thread.sleep(1000); + } + client.tableOperations().delete(table1); confirmCompactionCompleted(getCluster().getServerContext(), ecids, TCompactionState.CANCELLED); + // ELASTICITY_TODO make delete table fate op get operation ids before deleting + // there should be no metadata for the table, check to see if the compaction wrote anything + // after table delete + try (var scanner = client.createScanner(MetadataTable.NAME)) { + scanner.setRange(MetadataSchema.TabletsSection.getRange(tid)); + assertEquals(0, scanner.stream().count()); + } + } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 037488c46f..b45c0337bd 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -21,37 +21,27 @@ package org.apache.accumulo.test.functional; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; -import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.NoSuchElementException; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; @@ -60,39 +50,30 @@ import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.PluginConfig; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; -import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.iterators.DevNull; import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.user.AgeOffFilter; import org.apache.accumulo.core.iterators.user.GrepIterator; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; -import org.apache.accumulo.test.compaction.CompactionExecutorIT; -import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.FSelector; +import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -189,114 +170,6 @@ public class CompactionIT extends AccumuloClusterHarness { hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } - @Test - public void testBadSelector() throws Exception { - try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - final String tableName = getUniqueNames(1)[0]; - NewTableConfiguration tc = new NewTableConfiguration(); - // Ensure compactions don't kick off - tc.setProperties(Map.of(Property.TABLE_MAJC_RATIO.getKey(), "10.0")); - c.tableOperations().create(tableName, tc); - // Create multiple RFiles - try (BatchWriter bw = c.createBatchWriter(tableName)) { - for (int i = 1; i <= 4; i++) { - Mutation m = new Mutation(Integer.toString(i)); - m.put("cf", "cq", new Value()); - bw.addMutation(m); - bw.flush(); - // flush often to create multiple files to compact - c.tableOperations().flush(tableName, null, null, true); - } - } - - CompactionConfig config = new CompactionConfig() - .setSelector(new PluginConfig(ErrorThrowingSelector.class.getName(), Map.of())) - .setWait(true); - assertThrows(AccumuloException.class, () -> c.tableOperations().compact(tableName, config)); - - List<String> rows = new ArrayList<>(); - c.createScanner(tableName).forEach((k, v) -> rows.add(k.getRow().toString())); - assertEquals(List.of("1", "2", "3", "4"), rows); - - assertNoCompactionMetadata(tableName); - } - } - - @Test - public void testCompactionWithTableIterator() throws Exception { - String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - client.tableOperations().create(table1); - try (BatchWriter bw = client.createBatchWriter(table1)) { - for (int i = 1; i <= 4; i++) { - Mutation m = new Mutation(Integer.toString(i)); - m.put("cf", "cq", new Value()); - bw.addMutation(m); - bw.flush(); - // flush often to create multiple files to compact - client.tableOperations().flush(table1, null, null, true); - } - } - - IteratorSetting setting = new IteratorSetting(50, "delete", DevNull.class); - client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); - client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); - - try (Scanner s = client.createScanner(table1)) { - assertFalse(s.iterator().hasNext()); - } - - assertNoCompactionMetadata(table1); - } - } - - @Test - public void testUserCompactionCancellation() throws Exception { - final String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - client.tableOperations().create(table1); - try (BatchWriter bw = client.createBatchWriter(table1)) { - for (int i = 1; i <= MAX_DATA; i++) { - Mutation m = new Mutation(Integer.toString(i)); - m.put("cf", "cq", new Value()); - bw.addMutation(m); - bw.flush(); - // flush often to create multiple files to compact - client.tableOperations().flush(table1, null, null, true); - } - } - - final AtomicReference<Exception> error = new AtomicReference<>(); - Thread t = new Thread(() -> { - try { - IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class); - setting.addOption("sleepTime", "3000"); - setting.addOption("seekSleepTime", "3000"); - var cconf = new CompactionConfig().setWait(true).setIterators(List.of(setting)); - client.tableOperations().compact(table1, cconf); - } catch (AccumuloSecurityException | TableNotFoundException | AccumuloException e) { - error.set(e); - } - }); - t.start(); - // when the compaction starts it will create a selected files column in the tablet, wait for - // that to happen - while (countTablets(table1, tm -> tm.getSelectedFiles() != null) == 0) { - Thread.sleep(1000); - } - client.tableOperations().cancelCompaction(table1); - t.join(); - Exception e = error.get(); - assertNotNull(e); - assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, e.getMessage()); - // ensure the canceled compaction deletes any tablet metadata related to the compaction - while (countTablets(table1, - tm -> tm.getSelectedFiles() != null || !tm.getCompacted().isEmpty()) > 0) { - Thread.sleep(1000); - } - } - } - private long countTablets(String tableName, Predicate<TabletMetadata> tabletTest) { var tableId = TableId.of(getServerContext().tableOperations().tableIdMap().get(tableName)); try (var tabletsMetadata = @@ -305,292 +178,6 @@ public class CompactionIT extends AccumuloClusterHarness { } } - @Test - public void testErrorDuringUserCompaction() throws Exception { - final String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - client.tableOperations().create(table1); - client.tableOperations().setProperty(table1, Property.TABLE_FILE_MAX.getKey(), "1001"); - client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.getKey(), "1001"); - TableId tid = TableId.of(client.tableOperations().tableIdMap().get(table1)); - - // In addition to testing errors in compactions, this test also exercises creating lots of - // files to compact. The following will create 1000 files to compact. When changing this test - // try to keep both or create a new test for lots of files to compact. - ReadWriteIT.ingest(client, MAX_DATA, 1, 1, 0, "colf", table1, 1); - - Ample ample = ((ClientContext) client).getAmple(); - TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build(); - TabletMetadata tm = tms.iterator().next(); - assertEquals(1000, tm.getFiles().size()); - - IteratorSetting setting = new IteratorSetting(50, "error", ErrorThrowingIterator.class); - setting.addOption(ErrorThrowingIterator.TIMES, "3"); - client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); - client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); - - tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES, ColumnType.ECOMP).build(); - tm = tms.iterator().next(); - assertEquals(1, tm.getFiles().size()); - // ensure the failed compactions did not leave anything in the metadata table - assertEquals(0, tm.getExternalCompactions().size()); - - ReadWriteIT.verify(client, MAX_DATA, 1, 1, 0, table1); - } - } - - @Test - public void testErrorDuringCompactionNoOutput() throws Exception { - final String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - client.tableOperations().create(table1); - client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.getKey(), "51"); - TableId tid = TableId.of(client.tableOperations().tableIdMap().get(table1)); - - ReadWriteIT.ingest(client, 50, 1, 1, 0, "colf", table1, 1); - ReadWriteIT.verify(client, 50, 1, 1, 0, table1); - - Ample ample = ((ClientContext) client).getAmple(); - TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build(); - TabletMetadata tm = tms.iterator().next(); - assertEquals(50, tm.getFiles().size()); - - IteratorSetting setting = new IteratorSetting(50, "ageoff", AgeOffFilter.class); - setting.addOption("ttl", "0"); - setting.addOption("currentTime", Long.toString(System.currentTimeMillis() + 86400)); - client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); - - // Since this iterator is on the top, it will throw an error 3 times, then allow the - // ageoff iterator to do its work. - IteratorSetting setting2 = new IteratorSetting(51, "error", ErrorThrowingIterator.class); - setting2.addOption(ErrorThrowingIterator.TIMES, "3"); - client.tableOperations().attachIterator(table1, setting2, EnumSet.of(IteratorScope.majc)); - client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); - - assertThrows(NoSuchElementException.class, () -> ample.readTablets().forTable(tid) - .fetch(ColumnType.FILES).build().iterator().next()); - assertEquals(0, client.createScanner(table1).stream().count()); - } - } - - @Test - public void testTableDeletedDuringUserCompaction() throws Exception { - final String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - client.tableOperations().create(table1); - try (BatchWriter bw = client.createBatchWriter(table1)) { - for (int i = 1; i <= MAX_DATA; i++) { - Mutation m = new Mutation(Integer.toString(i)); - m.put("cf", "cq", new Value()); - bw.addMutation(m); - bw.flush(); - // flush often to create multiple files to compact - client.tableOperations().flush(table1, null, null, true); - } - } - - final AtomicReference<Exception> error = new AtomicReference<>(); - Thread t = new Thread(() -> { - try { - IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class); - setting.addOption("sleepTime", "3000"); - setting.addOption("seekSleepTime", "3000"); - var cconf = new CompactionConfig().setWait(true).setIterators(List.of(setting)); - client.tableOperations().compact(table1, cconf); - } catch (AccumuloSecurityException | TableNotFoundException | AccumuloException e) { - error.set(e); - } - }); - t.start(); - // when the compaction starts it will create a selected files column in the tablet, wait for - // that to happen - while (countTablets(table1, tm -> tm.getSelectedFiles() != null) == 0) { - Thread.sleep(1000); - } - - // grab the table id before deleting the table as its needed for a check later and can not get - // it after delete - var tableId = TableId.of(getServerContext().tableOperations().tableIdMap().get(table1)); - - client.tableOperations().delete(table1); - t.join(); - Exception e = error.get(); - assertNotNull(e); - assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, e.getMessage()); - - // ELASTICITY_TODO make delete table fate op get operation ids before deleting - // there should be no metadata for the table, check to see if the compaction wrote anything - // after table delete - try (var scanner = client.createScanner(MetadataTable.NAME)) { - scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId)); - assertEquals(0, scanner.stream().count()); - } - } - } - - @Test - public void testPartialCompaction() throws Exception { - String tableName = getUniqueNames(1)[0]; - try (final AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - - client.tableOperations().create(tableName); - - // Insert MAX_DATA rows - try (BatchWriter bw = client.createBatchWriter(tableName)) { - for (int i = 0; i < MAX_DATA; i++) { - Mutation m = new Mutation(String.format("r:%04d", i)); - m.put("", "", "" + i); - bw.addMutation(m); - } - } - client.tableOperations().flush(tableName, null, null, true); - IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); - // make sure iterator options make it to compactor process - iterSetting.addOption("modulus", 17 + ""); - CompactionConfig config = - new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true); - client.tableOperations().compact(tableName, config); - - // Insert 2 * MAX_DATA rows - try (BatchWriter bw = client.createBatchWriter(tableName)) { - for (int i = MAX_DATA; i < MAX_DATA * 2; i++) { - Mutation m = new Mutation(String.format("r:%04d", i)); - m.put("", "", "" + i); - bw.addMutation(m); - } - } - // this should create an F file - client.tableOperations().flush(tableName, null, null, true); - - // ELASTICITY_TODO compactions flush tablets, needs to evaluate this behavior - - // run a compaction that only compacts F files - iterSetting = new IteratorSetting(100, TestFilter.class); - // compact F file w/ different modulus and user pmodulus option for partial compaction - iterSetting.addOption("pmodulus", 19 + ""); - config = new CompactionConfig().setIterators(List.of(iterSetting)).setWait(true) - .setSelector(new PluginConfig(FSelector.class.getName())); - client.tableOperations().compact(tableName, config); - assertNoCompactionMetadata(tableName); - - try (Scanner scanner = client.createScanner(tableName)) { - int count = 0; - for (Entry<Key,Value> entry : scanner) { - - int v = Integer.parseInt(entry.getValue().toString()); - int modulus = v < MAX_DATA ? 17 : 19; - - assertEquals(0, Integer.parseInt(entry.getValue().toString()) % modulus, - String.format("%s %s %d != 0", entry.getValue(), "%", modulus)); - count++; - } - - // Verify - int expectedCount = 0; - for (int i = 0; i < MAX_DATA * 2; i++) { - int modulus = i < MAX_DATA ? 17 : 19; - if (i % modulus == 0) { - expectedCount++; - } - } - assertEquals(expectedCount, count); - } - - } - } - - @Test - public void testConfigurer() throws Exception { - String tableName = this.getUniqueNames(1)[0]; - - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - - Map<String,String> props = Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"); - NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); - client.tableOperations().create(tableName, ntc); - - byte[] data = new byte[100000]; - generateConfigurerTestData(tableName, client, data); - - // without compression, expect file to be large - long sizes = CompactionExecutorIT.getFileSizes(client, tableName); - assertTrue(sizes > data.length * 10 && sizes < data.length * 11, - "Unexpected files sizes : " + sizes); - - client.tableOperations().compact(tableName, - new CompactionConfig().setWait(true) - .setConfigurer(new PluginConfig(CompressionConfigurer.class.getName(), - Map.of(CompressionConfigurer.LARGE_FILE_COMPRESSION_TYPE, "gz", - CompressionConfigurer.LARGE_FILE_COMPRESSION_THRESHOLD, data.length + "")))); - assertNoCompactionMetadata(tableName); - - // after compacting with compression, expect small file - sizes = CompactionExecutorIT.getFileSizes(client, tableName); - assertTrue(sizes < data.length, - "Unexpected files sizes: data: " + data.length + ", file:" + sizes); - - client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); - assertNoCompactionMetadata(tableName); - - // after compacting without compression, expect big files again - sizes = CompactionExecutorIT.getFileSizes(client, tableName); - assertTrue(sizes > data.length * 10 && sizes < data.length * 11, - "Unexpected files sizes : " + sizes); - - } - } - - @Test - public void testConfigurerSetOnTable() throws Exception { - String tableName = this.getUniqueNames(1)[0]; - - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - - byte[] data = new byte[100000]; - - Map<String, - String> props = Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none", - Property.TABLE_COMPACTION_CONFIGURER.getKey(), CompressionConfigurer.class.getName(), - Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey() - + CompressionConfigurer.LARGE_FILE_COMPRESSION_TYPE, - "gz", Property.TABLE_COMPACTION_CONFIGURER_OPTS.getKey() - + CompressionConfigurer.LARGE_FILE_COMPRESSION_THRESHOLD, - "" + data.length); - NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); - client.tableOperations().create(tableName, ntc); - - generateConfigurerTestData(tableName, client, data); - - // without compression, expect file to be large - long sizes = CompactionExecutorIT.getFileSizes(client, tableName); - assertTrue(sizes > data.length * 10 && sizes < data.length * 11, - "Unexpected files sizes : " + sizes); - - client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); - - // after compacting with compression, expect small file - sizes = CompactionExecutorIT.getFileSizes(client, tableName); - assertTrue(sizes < data.length, - "Unexpected files sizes: data: " + data.length + ", file:" + sizes); - - assertNoCompactionMetadata(tableName); - - } - } - - private static void generateConfigurerTestData(String tableName, AccumuloClient client, - byte[] data) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - Arrays.fill(data, (byte) 65); - try (var writer = client.createBatchWriter(tableName)) { - for (int row = 0; row < 10; row++) { - Mutation m = new Mutation(row + ""); - m.at().family("big").qualifier("stuff").put(data); - writer.addMutation(m); - } - } - client.tableOperations().flush(tableName, null, null, true); - } - @Test public void testSuccessfulCompaction() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { @@ -699,7 +286,7 @@ public class CompactionIT extends AccumuloClusterHarness { var finalCount = countFiles(c); assertTrue(finalCount <= beforeCount); - assertNoCompactionMetadata(tableName); + ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), tableName); } } @@ -723,7 +310,7 @@ public class CompactionIT extends AccumuloClusterHarness { assertEquals(Set.of("a", "b"), getRows(c, tableName)); - assertNoCompactionMetadata(tableName); + ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), tableName); } } @@ -882,22 +469,6 @@ public class CompactionIT extends AccumuloClusterHarness { } } - private void assertNoCompactionMetadata(String tableName) { - var tableId = TableId.of(getServerContext().tableOperations().tableIdMap().get(tableName)); - var tabletsMetadata = getServerContext().getAmple().readTablets().forTable(tableId).build(); - - int count = 0; - - for (var tabletMetadata : tabletsMetadata) { - assertEquals(Set.of(), tabletMetadata.getCompacted()); - assertNull(tabletMetadata.getSelectedFiles()); - assertEquals(Set.of(), tabletMetadata.getExternalCompactions().keySet()); - count++; - } - - assertTrue(count > 0); - } - @Test public void testMetadataCompactions() throws Exception { // The metadata and root table have default config that causes them to compact down to one @@ -1044,18 +615,6 @@ public class CompactionIT extends AccumuloClusterHarness { } } - private void writeRandomValue(AccumuloClient c, String tableName, int size) throws Exception { - byte[] data1 = new byte[size]; - RANDOM.get().nextBytes(data1); - - try (BatchWriter bw = c.createBatchWriter(tableName)) { - Mutation m1 = new Mutation("r" + RANDOM.get().nextInt(909090)); - m1.put("data", "bl0b", new Value(data1)); - bw.addMutation(m1); - } - c.tableOperations().flush(tableName, null, null, true); - } - private Set<String> getRows(AccumuloClient c, String tableName) throws TableNotFoundException { Set<String> rows = new HashSet<>(); try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {