This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 990c6edebb interrupts compactions and adds IT to verify (#5395) 990c6edebb is described below commit 990c6edebb001d5212c354f9e0698cba7713c330 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Mar 13 12:16:05 2025 -0400 interrupts compactions and adds IT to verify (#5395) When a tablet is closed and compaction is running will now call Thread.interrupt() on the thread running the compaction. --- .../accumulo/server/compaction/CompactionInfo.java | 2 +- .../accumulo/server/compaction/FileCompactor.java | 49 +++++++-- .../accumulo/tserver/tablet/CompactableUtils.java | 4 +- .../accumulo/tserver/tablet/MinorCompactor.java | 3 +- .../accumulo/test/functional/CompactionIT.java | 109 +++++++++++++++++++++ .../accumulo/test/functional/SlowIterator.java | 29 +++++- 6 files changed, 181 insertions(+), 15 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java index b505c38cb9..35bf574fb1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java @@ -65,7 +65,7 @@ public class CompactionInfo { } public Thread getThread() { - return compactor.thread; + return compactor.getThread(); } public String getOutputFile() { diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 2645962887..667c486f6d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@ -79,6 +79,7 @@ import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Collections2; import io.opentelemetry.api.trace.Span; @@ -141,13 +142,48 @@ public class FileCompactor implements Callable<CompactionStats> { // a unique id to identify a compactor private final long compactorID = nextCompactorID.getAndIncrement(); - protected volatile Thread thread; + private volatile Thread thread; private final ServerContext context; private final AtomicBoolean interruptFlag = new AtomicBoolean(false); - public void interrupt() { + public synchronized void interrupt() { interruptFlag.set(true); + + if (thread != null) { + // Never want to interrupt the thread after clearThread was called as the thread could have + // moved on to something completely different than the compaction. This method and clearThread + // being synchronized and clearThread setting thread to null prevent this. + thread.interrupt(); + } + } + + private class ThreadClearer implements AutoCloseable { + @Override + public void close() throws InterruptedException { + clearThread(); + } + } + + private synchronized ThreadClearer setThread() { + thread = Thread.currentThread(); + return new ThreadClearer(); + } + + private synchronized void clearThread() throws InterruptedException { + Preconditions.checkState(thread == Thread.currentThread()); + thread = null; + // If the thread was interrupted during compaction do not want to allow the thread to continue + // w/ the interrupt status set as this could impact code unrelated to the compaction. For + // internal compactions the thread will execute metadata update code after the compaction and + // would not want the interrupt status set for that. + if (Thread.interrupted()) { + throw new InterruptedException(); + } + } + + Thread getThread() { + return thread; } public long getCompactorID() { @@ -272,7 +308,8 @@ public class FileCompactor implements Callable<CompactionStats> { } @Override - public CompactionStats call() throws IOException, CompactionCanceledException { + public CompactionStats call() + throws IOException, CompactionCanceledException, InterruptedException { FileSKVWriter mfw = null; @@ -290,8 +327,9 @@ public class FileCompactor implements Callable<CompactionStats> { String newThreadName = "MajC compacting " + extent + " started " + threadStartDate + " file: " + outputFile; Thread.currentThread().setName(newThreadName); - thread = Thread.currentThread(); - try { + // Use try w/ resources for clearing the thread instead of finally because clearing may throw an + // exception. Java's handling of exceptions thrown in finally blocks is not good. + try (var ignored = setThread()) { FileOperations fileFactory = FileOperations.getInstance(); FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath()); @@ -374,7 +412,6 @@ public class FileCompactor implements Callable<CompactionStats> { } finally { Thread.currentThread().setName(oldThreadName); if (remove) { - thread = null; runningCompactions.remove(this); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java index 1a670c81d1..56a2006456 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java @@ -560,7 +560,7 @@ public class CompactableUtils { static CompactionStats compact(Tablet tablet, CompactionJob job, CompactableImpl.CompactionInfo cInfo, CompactionEnv cenv, Map<StoredTabletFile,DataFileValue> compactFiles, TabletFile tmpFileName) - throws IOException, CompactionCanceledException { + throws IOException, CompactionCanceledException, InterruptedException { TableConfiguration tableConf = tablet.getTableConfiguration(); AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf, @@ -576,7 +576,7 @@ public class CompactableUtils { } }; final ScheduledFuture<?> future = tablet.getContext().getScheduledExecutor() - .scheduleWithFixedDelay(compactionCancellerTask, 10, 10, TimeUnit.SECONDS); + .scheduleWithFixedDelay(compactionCancellerTask, 3, 3, TimeUnit.SECONDS); try { return compactor.call(); } finally { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java index facc5024b3..71258af11d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java @@ -137,7 +137,7 @@ public class MinorCompactor extends FileCompactor { log.warn("MinC failed ({}) to create {} retrying ...", e.getMessage(), outputFileName, e); reportedProblem = true; retryCounter++; - } catch (CompactionCanceledException e) { + } catch (CompactionCanceledException | InterruptedException e) { throw new IllegalStateException(e); } @@ -161,7 +161,6 @@ public class MinorCompactor extends FileCompactor { } while (true); } finally { - thread = null; runningCompactions.remove(this); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 56e57bac33..fe79b5e8ef 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -37,11 +37,15 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -86,11 +90,14 @@ import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.accumulo.test.compaction.CompactionExecutorIT; import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.FSelector; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -797,6 +804,108 @@ public class CompactionIT extends AccumuloClusterHarness { } } + @Test + public void testMigrationCancelCompaction() throws Exception { + + // This test creates 40 tablets w/ slow iterator, causes 40 compactions to start, and then + // starts a new tablet server. Some of the tablets should migrate to the new tserver and cancel + // their compaction. Because the test uses a slow iterator, if close blocks on compaction then + // the test should timeout. Two tables are used to have different iterator settings inorder to + // test the two different way compactions can be canceled. Compactions can be canceled by thread + // interrupt or by a check that is done after a compaction iterator returns a key value. + + final String[] tables = this.getUniqueNames(2); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.instanceOperations().setProperty( + Property.TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS.getKey(), + "[{'name':'any','numThreads':20}]".replaceAll("'", "\"")); + + SortedSet<Text> splits = IntStream.range(1, 20).mapToObj(i -> String.format("%06d", i * 1000)) + .map(Text::new).collect(Collectors.toCollection(TreeSet::new)); + + // This iterator is intended to cover the case of a compaction being canceled by thread + // interrupt. + IteratorSetting setting1 = new IteratorSetting(50, "sleepy", SlowIterator.class); + setting1.addOption("sleepTime", "300000"); + setting1.addOption("seekSleepTime", "3000"); + SlowIterator.sleepUninterruptibly(setting1, false); + + client.tableOperations().create(tables[0], new NewTableConfiguration().withSplits(splits) + .attachIterator(setting1, EnumSet.of(IteratorScope.majc))); + + // This iterator is intended to cover the case of compaction being canceled by the check after + // a key value is returned. The iterator is configured to ignore interrupts. + IteratorSetting setting2 = new IteratorSetting(50, "sleepy", SlowIterator.class); + setting2.addOption("sleepTime", "2000"); + setting2.addOption("seekSleepTime", "2000"); + SlowIterator.sleepUninterruptibly(setting2, true); + + client.tableOperations().create(tables[1], new NewTableConfiguration().withSplits(splits) + .attachIterator(setting2, EnumSet.of(IteratorScope.majc))); + + // write files to each tablet, should cause compactions to start + for (var table : tables) { + for (int round = 0; round < 5; round++) { + try (var writer = client.createBatchWriter(table)) { + for (int i = 0; i < 20_000; i++) { + Mutation m = new Mutation(String.format("%06d", i)); + m.put("f", "q", "v"); + writer.addMutation(m); + } + } + client.tableOperations().flush(table, null, null, true); + } + } + + assertEquals(2, client.instanceOperations().getTabletServers().size()); + + var ctx = (ClientContext) client; + var tableId1 = ctx.getTableId(tables[0]); + var tableId2 = ctx.getTableId(tables[1]); + + Wait.waitFor(() -> { + var runningCompactions = client.instanceOperations().getActiveCompactions().stream() + .map(ac -> ac.getTablet().getTable()) + .filter(tid -> tid.equals(tableId1) || tid.equals(tableId2)).count(); + log.debug("Running compactions {}", runningCompactions); + return runningCompactions == 40; + }); + + ((MiniAccumuloClusterImpl) getCluster()).getConfig().setNumTservers(3); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER, "localhost"); + + Wait.waitFor(() -> { + var servers = client.instanceOperations().getTabletServers().size(); + log.debug("Server count {}", servers); + return 3 == servers; + }); + + Wait.waitFor(() -> { + try (var tablets = ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER) + .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW).build()) { + Map<String,Long> counts = new HashMap<>(); + for (var tablet : tablets) { + if (!tablet.getTableId().equals(tableId1) && !tablet.getTableId().equals(tableId2)) { + continue; + } + + if (tablet.getLocation() != null + && tablet.getLocation().getType() == TabletMetadata.LocationType.CURRENT) { + counts.merge(tablet.getLocation().getHostPort(), 1L, Long::sum); + } + } + + var total = counts.values().stream().mapToLong(l -> l).sum(); + var min = counts.values().stream().mapToLong(l -> l).min().orElse(0); + var max = counts.values().stream().mapToLong(l -> l).max().orElse(100); + var serversSeen = counts.keySet(); + log.debug("total:{} min:{} max:{} serversSeen:{}", total, min, max, serversSeen); + return total == 40 && min == 12 && max == 14 && serversSeen.size() == 3; + } + }); + } + } + /** * Counts the number of tablets and files in a table. */ diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java index 714ace03bf..d9c8ddca12 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java @@ -18,8 +18,6 @@ */ package org.apache.accumulo.test.functional; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; - import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -33,14 +31,17 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.util.UtilWaitThread; public class SlowIterator extends WrappingIterator { private static final String SLEEP_TIME = "sleepTime"; private static final String SEEK_SLEEP_TIME = "seekSleepTime"; + private static final String SLEEP_UNINTERRUPTIBLY = "sleepUninterruptibly"; private long sleepTime = 0; private long seekSleepTime = 0; + private boolean sleepUninterruptibly = true; public static void setSleepTime(IteratorSetting is, long millis) { is.addOption(SLEEP_TIME, Long.toString(millis)); @@ -50,6 +51,22 @@ public class SlowIterator extends WrappingIterator { is.addOption(SEEK_SLEEP_TIME, Long.toString(t)); } + public static void sleepUninterruptibly(IteratorSetting is, boolean b) { + is.addOption(SLEEP_UNINTERRUPTIBLY, Boolean.toString(b)); + } + + private void sleep(long time) throws IOException { + if (sleepUninterruptibly) { + UtilWaitThread.sleepUninterruptibly(time, TimeUnit.MILLISECONDS); + } else { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { throw new UnsupportedOperationException(); @@ -57,14 +74,14 @@ public class SlowIterator extends WrappingIterator { @Override public void next() throws IOException { - sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); + sleep(sleepTime); super.next(); } @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { - sleepUninterruptibly(seekSleepTime, TimeUnit.MILLISECONDS); + sleep(seekSleepTime); super.seek(range, columnFamilies, inclusive); } @@ -79,6 +96,10 @@ public class SlowIterator extends WrappingIterator { if (options.containsKey(SEEK_SLEEP_TIME)) { seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME)); } + + if (options.containsKey(SLEEP_UNINTERRUPTIBLY)) { + sleepUninterruptibly = Boolean.parseBoolean(options.get(SLEEP_UNINTERRUPTIBLY)); + } } }