This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 7cf5c73222 Modified coordinator restart test to test compaction duration update (#4766) 7cf5c73222 is described below commit 7cf5c732229894034ed05e177db587e4bb89c263 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Jul 29 07:59:55 2024 -0400 Modified coordinator restart test to test compaction duration update (#4766) Closes #4680 --- .../coordinator/CompactionCoordinator.java | 5 +- .../test/compaction/ExternalCompaction_3_IT.java | 115 +++++++++++++++------ 2 files changed, 86 insertions(+), 34 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 355d5a8a5e..34e9d9191b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -148,6 +148,9 @@ public class CompactionCoordinator private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); + public static final String RESTART_UPDATE_MSG = + "Coordinator restarted, compaction found in progress"; + /* * Map of compactionId to RunningCompactions. This is an informational cache of what external * compactions may be running. Its possible it may contain external compactions that are not @@ -301,7 +304,7 @@ public class CompactionCoordinator running.forEach(rc -> { TCompactionStatusUpdate update = new TCompactionStatusUpdate(); update.setState(TCompactionState.IN_PROGRESS); - update.setMessage("Coordinator restarted, compaction found in progress"); + update.setMessage(RESTART_UPDATE_MSG); rc.addUpdate(System.currentTimeMillis(), update); RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc); }); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java index b1d7128b59..6d80c970e3 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java @@ -22,7 +22,6 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GR import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getLastState; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; @@ -31,29 +30,40 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; +import org.apache.accumulo.core.util.compaction.RunningCompactionInfo; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.FindCompactionTmpFiles; +import org.apache.accumulo.test.functional.SlowIterator; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -76,9 +86,6 @@ public class ExternalCompaction_3_IT extends SharedMiniClusterBase { @BeforeAll public static void beforeTests() throws Exception { startMiniClusterWithConfig(new ExternalCompaction3Config()); - getCluster().getClusterControl().stop(ServerType.COMPACTOR); - getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, - ExternalDoNothingCompactor.class); } @Test @@ -88,6 +95,10 @@ public class ExternalCompaction_3_IT extends SharedMiniClusterBase { try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + getCluster().getClusterControl().stop(ServerType.COMPACTOR); + getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, + ExternalDoNothingCompactor.class); + createTable(client, table1, "cs1", 2); // set compaction ratio to 1 so that majc occurs naturally, not user compaction // user compaction blocks merge @@ -143,6 +154,10 @@ public class ExternalCompaction_3_IT extends SharedMiniClusterBase { // Verify that the tmp file are cleaned up Wait.waitFor(() -> FindCompactionTmpFiles .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 0); + } finally { + getCluster().getClusterControl().stop(ServerType.COMPACTOR); + getCluster().getClusterControl().start(ServerType.COMPACTOR); + } } @@ -155,6 +170,12 @@ public class ExternalCompaction_3_IT extends SharedMiniClusterBase { createTable(client, table1, "cs2", 2); writeData(client, table1); + + IteratorSetting setting = new IteratorSetting(50, "slow", SlowIterator.class); + SlowIterator.setSeekSleepTime(setting, 5000); + SlowIterator.setSleepTime(setting, 5000); + client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); + compact(client, table1, 2, GROUP2, false); TableId tid = getCluster().getServerContext().getTableId(table1); @@ -163,44 +184,33 @@ public class ExternalCompaction_3_IT extends SharedMiniClusterBase { Set<ExternalCompactionId> ecids = waitForCompactionStartAndReturnEcids(getCluster().getServerContext(), tid); + ServerContext ctx = getCluster().getServerContext(); + + // Wait for all compactions to start + Map<ExternalCompactionId,RunningCompactionInfo> originalRunningInfo = null; + do { + originalRunningInfo = getRunningCompactionInformation(ctx, ecids); + } while (originalRunningInfo == null + || originalRunningInfo.values().stream().allMatch(rci -> rci.duration == 0)); + // Stop the Manager (Coordinator) getCluster().getClusterControl().stop(ServerType.MANAGER); // Restart the Manager while the compaction is running getCluster().getClusterControl().start(ServerType.MANAGER); - ServerContext ctx = getCluster().getServerContext(); + Map<ExternalCompactionId,RunningCompactionInfo> postRestartRunningInfo = + getRunningCompactionInformation(ctx, ecids); - // Confirm compaction is still running - int matches = 0; - while (matches == 0) { - TExternalCompactionList running = null; - while (running == null) { - try { - Optional<HostAndPort> coordinatorHost = - ExternalCompactionUtil.findCompactionCoordinator(ctx); - if (coordinatorHost.isEmpty()) { - throw new TTransportException( - "Unable to get CompactionCoordinator address from ZooKeeper"); - } - running = getRunningCompactions(ctx, coordinatorHost); - } catch (TException t) { - running = null; - Thread.sleep(2000); - } - } - if (running.getCompactions() != null) { - for (ExternalCompactionId ecid : ecids) { - TExternalCompaction tec = running.getCompactions().get(ecid.canonical()); - if (tec != null && tec.getUpdates() != null && !tec.getUpdates().isEmpty()) { - matches++; - assertEquals(TCompactionState.IN_PROGRESS, getLastState(tec)); - } - } + for (Entry<ExternalCompactionId,RunningCompactionInfo> post : postRestartRunningInfo + .entrySet()) { + if (originalRunningInfo.containsKey(post.getKey())) { + assertTrue( + (post.getValue().duration - originalRunningInfo.get(post.getKey()).duration) > 0); } - UtilWaitThread.sleep(250); + final String lastState = post.getValue().status; + assertTrue(lastState.equals(TCompactionState.IN_PROGRESS.name())); } - assertTrue(matches > 0); // We need to cancel the compaction or delete the table here because we initiate a user // compaction above in the test. Even though the external compaction was cancelled @@ -210,4 +220,43 @@ public class ExternalCompaction_3_IT extends SharedMiniClusterBase { } } + private Map<ExternalCompactionId,RunningCompactionInfo> getRunningCompactionInformation( + ServerContext ctx, Set<ExternalCompactionId> ecids) throws InterruptedException { + + final Map<ExternalCompactionId,RunningCompactionInfo> results = new HashMap<>(); + + while (results.isEmpty()) { + TExternalCompactionList running = null; + while (running == null || running.getCompactions() == null) { + try { + Optional<HostAndPort> coordinatorHost = + ExternalCompactionUtil.findCompactionCoordinator(ctx); + if (coordinatorHost.isEmpty()) { + throw new TTransportException( + "Unable to get CompactionCoordinator address from ZooKeeper"); + } + running = getRunningCompactions(ctx, coordinatorHost); + } catch (TException t) { + running = null; + Thread.sleep(2000); + } + } + for (ExternalCompactionId ecid : ecids) { + final TExternalCompaction tec = running.getCompactions().get(ecid.canonical()); + if (tec != null && tec.getUpdatesSize() > 0) { + // When the coordinator restarts it inserts a message into the updates. If this + // is the last message, then don't insert this into the results. We want to get + // an actual update from the Compactor. + TreeMap<Long,TCompactionStatusUpdate> sorted = new TreeMap<>(tec.getUpdates()); + var lastEntry = sorted.lastEntry(); + if (lastEntry.getValue().getMessage().equals(CompactionCoordinator.RESTART_UPDATE_MSG)) { + continue; + } + results.put(ecid, new RunningCompactionInfo(tec)); + } + } + } + return results; + } + }