This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 990c6edebb interrupts compactions and adds IT to verify (#5395)
990c6edebb is described below

commit 990c6edebb001d5212c354f9e0698cba7713c330
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Mar 13 12:16:05 2025 -0400

    interrupts compactions and adds IT to verify (#5395)
    
    When a tablet is closed and compaction is running will now call 
Thread.interrupt() on the thread running the compaction.
---
 .../accumulo/server/compaction/CompactionInfo.java |   2 +-
 .../accumulo/server/compaction/FileCompactor.java  |  49 +++++++--
 .../accumulo/tserver/tablet/CompactableUtils.java  |   4 +-
 .../accumulo/tserver/tablet/MinorCompactor.java    |   3 +-
 .../accumulo/test/functional/CompactionIT.java     | 109 +++++++++++++++++++++
 .../accumulo/test/functional/SlowIterator.java     |  29 +++++-
 6 files changed, 181 insertions(+), 15 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
index b505c38cb9..35bf574fb1 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
@@ -65,7 +65,7 @@ public class CompactionInfo {
   }
 
   public Thread getThread() {
-    return compactor.thread;
+    return compactor.getThread();
   }
 
   public String getOutputFile() {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 2645962887..667c486f6d 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Collections2;
 
 import io.opentelemetry.api.trace.Span;
@@ -141,13 +142,48 @@ public class FileCompactor implements 
Callable<CompactionStats> {
 
   // a unique id to identify a compactor
   private final long compactorID = nextCompactorID.getAndIncrement();
-  protected volatile Thread thread;
+  private volatile Thread thread;
   private final ServerContext context;
 
   private final AtomicBoolean interruptFlag = new AtomicBoolean(false);
 
-  public void interrupt() {
+  public synchronized void interrupt() {
     interruptFlag.set(true);
+
+    if (thread != null) {
+      // Never want to interrupt the thread after clearThread was called as 
the thread could have
+      // moved on to something completely different than the compaction. This 
method and clearThread
+      // being synchronized and clearThread setting thread to null prevent 
this.
+      thread.interrupt();
+    }
+  }
+
+  private class ThreadClearer implements AutoCloseable {
+    @Override
+    public void close() throws InterruptedException {
+      clearThread();
+    }
+  }
+
+  private synchronized ThreadClearer setThread() {
+    thread = Thread.currentThread();
+    return new ThreadClearer();
+  }
+
+  private synchronized void clearThread() throws InterruptedException {
+    Preconditions.checkState(thread == Thread.currentThread());
+    thread = null;
+    // If the thread was interrupted during compaction do not want to allow 
the thread to continue
+    // w/ the interrupt status set as this could impact code unrelated to the 
compaction. For
+    // internal compactions the thread will execute metadata update code after 
the compaction and
+    // would not want the interrupt status set for that.
+    if (Thread.interrupted()) {
+      throw new InterruptedException();
+    }
+  }
+
+  Thread getThread() {
+    return thread;
   }
 
   public long getCompactorID() {
@@ -272,7 +308,8 @@ public class FileCompactor implements 
Callable<CompactionStats> {
   }
 
   @Override
-  public CompactionStats call() throws IOException, 
CompactionCanceledException {
+  public CompactionStats call()
+      throws IOException, CompactionCanceledException, InterruptedException {
 
     FileSKVWriter mfw = null;
 
@@ -290,8 +327,9 @@ public class FileCompactor implements 
Callable<CompactionStats> {
     String newThreadName =
         "MajC compacting " + extent + " started " + threadStartDate + " file: 
" + outputFile;
     Thread.currentThread().setName(newThreadName);
-    thread = Thread.currentThread();
-    try {
+    // Use try w/ resources for clearing the thread instead of finally because 
clearing may throw an
+    // exception. Java's handling of exceptions thrown in finally blocks is 
not good.
+    try (var ignored = setThread()) {
       FileOperations fileFactory = FileOperations.getInstance();
       FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath());
 
@@ -374,7 +412,6 @@ public class FileCompactor implements 
Callable<CompactionStats> {
     } finally {
       Thread.currentThread().setName(oldThreadName);
       if (remove) {
-        thread = null;
         runningCompactions.remove(this);
       }
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index 1a670c81d1..56a2006456 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -560,7 +560,7 @@ public class CompactableUtils {
   static CompactionStats compact(Tablet tablet, CompactionJob job,
       CompactableImpl.CompactionInfo cInfo, CompactionEnv cenv,
       Map<StoredTabletFile,DataFileValue> compactFiles, TabletFile tmpFileName)
-      throws IOException, CompactionCanceledException {
+      throws IOException, CompactionCanceledException, InterruptedException {
     TableConfiguration tableConf = tablet.getTableConfiguration();
 
     AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf,
@@ -576,7 +576,7 @@ public class CompactableUtils {
       }
     };
     final ScheduledFuture<?> future = 
tablet.getContext().getScheduledExecutor()
-        .scheduleWithFixedDelay(compactionCancellerTask, 10, 10, 
TimeUnit.SECONDS);
+        .scheduleWithFixedDelay(compactionCancellerTask, 3, 3, 
TimeUnit.SECONDS);
     try {
       return compactor.call();
     } finally {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index facc5024b3..71258af11d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -137,7 +137,7 @@ public class MinorCompactor extends FileCompactor {
           log.warn("MinC failed ({}) to create {} retrying ...", 
e.getMessage(), outputFileName, e);
           reportedProblem = true;
           retryCounter++;
-        } catch (CompactionCanceledException e) {
+        } catch (CompactionCanceledException | InterruptedException e) {
           throw new IllegalStateException(e);
         }
 
@@ -161,7 +161,6 @@ public class MinorCompactor extends FileCompactor {
 
       } while (true);
     } finally {
-      thread = null;
       runningCompactions.remove(this);
     }
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 56e57bac33..fe79b5e8ef 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -37,11 +37,15 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -86,11 +90,14 @@ import 
org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
 import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.VerifyIngest;
 import org.apache.accumulo.test.VerifyIngest.VerifyParams;
 import org.apache.accumulo.test.compaction.CompactionExecutorIT;
 import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.FSelector;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -797,6 +804,108 @@ public class CompactionIT extends AccumuloClusterHarness {
     }
   }
 
+  @Test
+  public void testMigrationCancelCompaction() throws Exception {
+
+    // This test creates 40 tablets w/ slow iterator, causes 40 compactions to 
start, and then
+    // starts a new tablet server. Some of the tablets should migrate to the 
new tserver and cancel
+    // their compaction. Because the test uses a slow iterator, if close 
blocks on compaction then
+    // the test should timeout. Two tables are used to have different iterator 
settings inorder to
+    // test the two different way compactions can be canceled. Compactions can 
be canceled by thread
+    // interrupt or by a check that is done after a compaction iterator 
returns a key value.
+
+    final String[] tables = this.getUniqueNames(2);
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.instanceOperations().setProperty(
+          Property.TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS.getKey(),
+          "[{'name':'any','numThreads':20}]".replaceAll("'", "\""));
+
+      SortedSet<Text> splits = IntStream.range(1, 20).mapToObj(i -> 
String.format("%06d", i * 1000))
+          .map(Text::new).collect(Collectors.toCollection(TreeSet::new));
+
+      // This iterator is intended to cover the case of a compaction being 
canceled by thread
+      // interrupt.
+      IteratorSetting setting1 = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+      setting1.addOption("sleepTime", "300000");
+      setting1.addOption("seekSleepTime", "3000");
+      SlowIterator.sleepUninterruptibly(setting1, false);
+
+      client.tableOperations().create(tables[0], new 
NewTableConfiguration().withSplits(splits)
+          .attachIterator(setting1, EnumSet.of(IteratorScope.majc)));
+
+      // This iterator is intended to cover the case of compaction being 
canceled by the check after
+      // a key value is returned. The iterator is configured to ignore 
interrupts.
+      IteratorSetting setting2 = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+      setting2.addOption("sleepTime", "2000");
+      setting2.addOption("seekSleepTime", "2000");
+      SlowIterator.sleepUninterruptibly(setting2, true);
+
+      client.tableOperations().create(tables[1], new 
NewTableConfiguration().withSplits(splits)
+          .attachIterator(setting2, EnumSet.of(IteratorScope.majc)));
+
+      // write files to each tablet, should cause compactions to start
+      for (var table : tables) {
+        for (int round = 0; round < 5; round++) {
+          try (var writer = client.createBatchWriter(table)) {
+            for (int i = 0; i < 20_000; i++) {
+              Mutation m = new Mutation(String.format("%06d", i));
+              m.put("f", "q", "v");
+              writer.addMutation(m);
+            }
+          }
+          client.tableOperations().flush(table, null, null, true);
+        }
+      }
+
+      assertEquals(2, client.instanceOperations().getTabletServers().size());
+
+      var ctx = (ClientContext) client;
+      var tableId1 = ctx.getTableId(tables[0]);
+      var tableId2 = ctx.getTableId(tables[1]);
+
+      Wait.waitFor(() -> {
+        var runningCompactions = 
client.instanceOperations().getActiveCompactions().stream()
+            .map(ac -> ac.getTablet().getTable())
+            .filter(tid -> tid.equals(tableId1) || 
tid.equals(tableId2)).count();
+        log.debug("Running compactions {}", runningCompactions);
+        return runningCompactions == 40;
+      });
+
+      ((MiniAccumuloClusterImpl) getCluster()).getConfig().setNumTservers(3);
+      getCluster().getClusterControl().start(ServerType.TABLET_SERVER, 
"localhost");
+
+      Wait.waitFor(() -> {
+        var servers = client.instanceOperations().getTabletServers().size();
+        log.debug("Server count {}", servers);
+        return 3 == servers;
+      });
+
+      Wait.waitFor(() -> {
+        try (var tablets = 
ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
+            .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW).build()) {
+          Map<String,Long> counts = new HashMap<>();
+          for (var tablet : tablets) {
+            if (!tablet.getTableId().equals(tableId1) && 
!tablet.getTableId().equals(tableId2)) {
+              continue;
+            }
+
+            if (tablet.getLocation() != null
+                && tablet.getLocation().getType() == 
TabletMetadata.LocationType.CURRENT) {
+              counts.merge(tablet.getLocation().getHostPort(), 1L, Long::sum);
+            }
+          }
+
+          var total = counts.values().stream().mapToLong(l -> l).sum();
+          var min = counts.values().stream().mapToLong(l -> l).min().orElse(0);
+          var max = counts.values().stream().mapToLong(l -> 
l).max().orElse(100);
+          var serversSeen = counts.keySet();
+          log.debug("total:{} min:{} max:{} serversSeen:{}", total, min, max, 
serversSeen);
+          return total == 40 && min == 12 && max == 14 && serversSeen.size() 
== 3;
+        }
+      });
+    }
+  }
+
   /**
    * Counts the number of tablets and files in a table.
    */
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java 
b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
index 714ace03bf..d9c8ddca12 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
@@ -18,8 +18,6 @@
  */
 package org.apache.accumulo.test.functional;
 
-import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
-
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
@@ -33,14 +31,17 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.util.UtilWaitThread;
 
 public class SlowIterator extends WrappingIterator {
 
   private static final String SLEEP_TIME = "sleepTime";
   private static final String SEEK_SLEEP_TIME = "seekSleepTime";
+  private static final String SLEEP_UNINTERRUPTIBLY = "sleepUninterruptibly";
 
   private long sleepTime = 0;
   private long seekSleepTime = 0;
+  private boolean sleepUninterruptibly = true;
 
   public static void setSleepTime(IteratorSetting is, long millis) {
     is.addOption(SLEEP_TIME, Long.toString(millis));
@@ -50,6 +51,22 @@ public class SlowIterator extends WrappingIterator {
     is.addOption(SEEK_SLEEP_TIME, Long.toString(t));
   }
 
+  public static void sleepUninterruptibly(IteratorSetting is, boolean b) {
+    is.addOption(SLEEP_UNINTERRUPTIBLY, Boolean.toString(b));
+  }
+
+  private void sleep(long time) throws IOException {
+    if (sleepUninterruptibly) {
+      UtilWaitThread.sleepUninterruptibly(time, TimeUnit.MILLISECONDS);
+    } else {
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
     throw new UnsupportedOperationException();
@@ -57,14 +74,14 @@ public class SlowIterator extends WrappingIterator {
 
   @Override
   public void next() throws IOException {
-    sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
+    sleep(sleepTime);
     super.next();
   }
 
   @Override
   public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
       throws IOException {
-    sleepUninterruptibly(seekSleepTime, TimeUnit.MILLISECONDS);
+    sleep(seekSleepTime);
     super.seek(range, columnFamilies, inclusive);
   }
 
@@ -79,6 +96,10 @@ public class SlowIterator extends WrappingIterator {
     if (options.containsKey(SEEK_SLEEP_TIME)) {
       seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME));
     }
+
+    if (options.containsKey(SLEEP_UNINTERRUPTIBLY)) {
+      sleepUninterruptibly = 
Boolean.parseBoolean(options.get(SLEEP_UNINTERRUPTIBLY));
+    }
   }
 
 }

Reply via email to