This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 8bf0970c7f fixes issues related to deleting a compaction service 
(#4211)
8bf0970c7f is described below

commit 8bf0970c7f1ca9cf37a16150048f983f6959f1d0
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Feb 1 17:01:01 2024 -0500

    fixes issues related to deleting a compaction service (#4211)
    
    The compaction manager was not stopping deleted compaction services.
    This commit fixes that and adds a test that deletes a compaction
    service.
---
 .../tserver/compactions/CompactionManager.java     |  8 +-
 .../tserver/compactions/CompactionService.java     |  1 +
 .../accumulo/test/functional/CompactionIT.java     | 89 ++++++++++++++++++++++
 3 files changed, 94 insertions(+), 4 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index be872a6ed9..4a244c4785 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -276,13 +276,13 @@ public class CompactionManager {
           }
         });
 
-        var deletedServices =
-            Sets.difference(currentCfg.getPlanners().keySet(), 
tmpCfg.getPlanners().keySet());
+        var deletedServices = Sets.difference(services.keySet(), 
tmpServices.keySet());
 
-        for (String serviceName : deletedServices) {
-          services.get(CompactionServiceId.of(serviceName)).stop();
+        for (var dcsid : deletedServices) {
+          services.get(dcsid).stop();
         }
 
+        this.currentCfg = tmpCfg;
         this.services = Map.copyOf(tmpServices);
 
         HashSet<CompactionExecutorId> activeExternalExecs = new HashSet<>();
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index e7bc18403d..78f5fc1173 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -444,6 +444,7 @@ public class CompactionService {
 
   public void stop() {
     executors.values().forEach(CompactionExecutor::stop);
+    log.debug("Stopped compaction service {}", myId);
   }
 
   int getCompactionsRunning(CType ctype) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index ec044816b2..93c8c1713e 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -30,6 +30,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -76,6 +77,8 @@ import 
org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.VerifyIngest;
@@ -625,6 +628,92 @@ public class CompactionIT extends AccumuloClusterHarness {
     }
   }
 
+  @Test
+  public void testDeleteCompactionService() throws Exception {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      var uniqueNames = getUniqueNames(2);
+      String table1 = uniqueNames[0];
+      String table2 = uniqueNames[1];
+
+      // create a compaction service named deleteme
+      c.instanceOperations().setProperty(
+          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"deleteme.planner",
+          DefaultCompactionPlanner.class.getName());
+      c.instanceOperations().setProperty(
+          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"deleteme.planner.opts.executors",
+          "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", 
"\""));
+
+      // create a compaction service named keepme
+      c.instanceOperations().setProperty(
+          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner",
+          DefaultCompactionPlanner.class.getName());
+      c.instanceOperations().setProperty(
+          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"keepme.planner.opts.executors",
+          "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", 
"\""));
+
+      // create a table that uses the compaction service deleteme
+      Map<String,String> props = new HashMap<>();
+      props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(),
+          SimpleCompactionDispatcher.class.getName());
+      props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + 
"service", "deleteme");
+      c.tableOperations().create(table1, new 
NewTableConfiguration().setProperties(props));
+
+      // create a table that uses the compaction service keepme
+      props.clear();
+      props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(),
+          SimpleCompactionDispatcher.class.getName());
+      props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + 
"service", "keepme");
+      c.tableOperations().create(table2, new 
NewTableConfiguration().setProperties(props));
+
+      try (var writer1 = c.createBatchWriter(table1); var writer2 = 
c.createBatchWriter(table2)) {
+        for (int i = 0; i < 10; i++) {
+          Mutation m = new Mutation("" + i);
+          m.put("f", "q", "" + i);
+          writer1.addMutation(m);
+          writer2.addMutation(m);
+        }
+      }
+
+      c.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+      c.tableOperations().compact(table2, new 
CompactionConfig().setWait(true));
+
+      // delete the compaction service deleteme
+      c.instanceOperations()
+          .removeProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"deleteme.planner");
+      c.instanceOperations().removeProperty(
+          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"deleteme.planner.opts.executors");
+
+      // add a new compaction service named newcs
+      c.instanceOperations().setProperty(
+          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner",
+          DefaultCompactionPlanner.class.getName());
+      c.instanceOperations().setProperty(
+          Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"newcs.planner.opts.executors",
+          "[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", 
"\""));
+
+      // set table 1 to a compaction service newcs
+      c.tableOperations().setProperty(table1,
+          Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", 
"newcs");
+
+      // ensure tables can still compact and are not impacted by the deleted 
compaction service
+      for (int i = 0; i < 10; i++) {
+        c.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+        c.tableOperations().compact(table2, new 
CompactionConfig().setWait(true));
+
+        try (var scanner = c.createScanner(table1)) {
+          assertEquals(9 * 10 / 2, scanner.stream().map(Entry::getValue)
+              .mapToInt(v -> Integer.parseInt(v.toString())).sum());
+        }
+        try (var scanner = c.createScanner(table2)) {
+          assertEquals(9 * 10 / 2, scanner.stream().map(Entry::getValue)
+              .mapToInt(v -> Integer.parseInt(v.toString())).sum());
+        }
+
+        Thread.sleep(100);
+      }
+    }
+  }
+
   private int countFiles(AccumuloClient c) throws Exception {
     try (Scanner s = c.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY)) {
       s.fetchColumnFamily(new Text(TabletColumnFamily.NAME));

Reply via email to