This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 8bf0970c7f fixes issues related to deleting a compaction service (#4211) 8bf0970c7f is described below commit 8bf0970c7f1ca9cf37a16150048f983f6959f1d0 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Feb 1 17:01:01 2024 -0500 fixes issues related to deleting a compaction service (#4211) The compaction manager was not stopping deleted compaction services. This commit fixes that and adds a test that deletes a compaction service. --- .../tserver/compactions/CompactionManager.java | 8 +- .../tserver/compactions/CompactionService.java | 1 + .../accumulo/test/functional/CompactionIT.java | 89 ++++++++++++++++++++++ 3 files changed, 94 insertions(+), 4 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index be872a6ed9..4a244c4785 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -276,13 +276,13 @@ public class CompactionManager { } }); - var deletedServices = - Sets.difference(currentCfg.getPlanners().keySet(), tmpCfg.getPlanners().keySet()); + var deletedServices = Sets.difference(services.keySet(), tmpServices.keySet()); - for (String serviceName : deletedServices) { - services.get(CompactionServiceId.of(serviceName)).stop(); + for (var dcsid : deletedServices) { + services.get(dcsid).stop(); } + this.currentCfg = tmpCfg; this.services = Map.copyOf(tmpServices); HashSet<CompactionExecutorId> activeExternalExecs = new HashSet<>(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java index e7bc18403d..78f5fc1173 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java @@ -444,6 +444,7 @@ public class CompactionService { public void stop() { executors.values().forEach(CompactionExecutor::stop); + log.debug("Stopped compaction service {}", myId); } int getCompactionsRunning(CType ctype) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index ec044816b2..93c8c1713e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -30,6 +30,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -76,6 +77,8 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; +import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.VerifyIngest; @@ -625,6 +628,92 @@ public class CompactionIT extends AccumuloClusterHarness { } } + @Test + public void testDeleteCompactionService() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var uniqueNames = getUniqueNames(2); + String table1 = uniqueNames[0]; + String table2 = uniqueNames[1]; + + // create a compaction service named deleteme + c.instanceOperations().setProperty( + Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner", + DefaultCompactionPlanner.class.getName()); + c.instanceOperations().setProperty( + Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner.opts.executors", + "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\"")); + + // create a compaction service named keepme + c.instanceOperations().setProperty( + Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner", + DefaultCompactionPlanner.class.getName()); + c.instanceOperations().setProperty( + Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner.opts.executors", + "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\"")); + + // create a table that uses the compaction service deleteme + Map<String,String> props = new HashMap<>(); + props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(), + SimpleCompactionDispatcher.class.getName()); + props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "deleteme"); + c.tableOperations().create(table1, new NewTableConfiguration().setProperties(props)); + + // create a table that uses the compaction service keepme + props.clear(); + props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(), + SimpleCompactionDispatcher.class.getName()); + props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "keepme"); + c.tableOperations().create(table2, new NewTableConfiguration().setProperties(props)); + + try (var writer1 = c.createBatchWriter(table1); var writer2 = c.createBatchWriter(table2)) { + for (int i = 0; i < 10; i++) { + Mutation m = new Mutation("" + i); + m.put("f", "q", "" + i); + writer1.addMutation(m); + writer2.addMutation(m); + } + } + + c.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + c.tableOperations().compact(table2, new CompactionConfig().setWait(true)); + + // delete the compaction service deleteme + c.instanceOperations() + .removeProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner"); + c.instanceOperations().removeProperty( + Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner.opts.executors"); + + // add a new compaction service named newcs + c.instanceOperations().setProperty( + Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner", + DefaultCompactionPlanner.class.getName()); + c.instanceOperations().setProperty( + Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.executors", + "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\"")); + + // set table 1 to a compaction service newcs + c.tableOperations().setProperty(table1, + Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "newcs"); + + // ensure tables can still compact and are not impacted by the deleted compaction service + for (int i = 0; i < 10; i++) { + c.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + c.tableOperations().compact(table2, new CompactionConfig().setWait(true)); + + try (var scanner = c.createScanner(table1)) { + assertEquals(9 * 10 / 2, scanner.stream().map(Entry::getValue) + .mapToInt(v -> Integer.parseInt(v.toString())).sum()); + } + try (var scanner = c.createScanner(table2)) { + assertEquals(9 * 10 / 2, scanner.stream().map(Entry::getValue) + .mapToInt(v -> Integer.parseInt(v.toString())).sum()); + } + + Thread.sleep(100); + } + } + } + private int countFiles(AccumuloClient c) throws Exception { try (Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { s.fetchColumnFamily(new Text(TabletColumnFamily.NAME));