This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 35f2cf5dc4 FateOpsCommandsIT enhancements (#4400) 35f2cf5dc4 is described below commit 35f2cf5dc4ddce4b13fdbc4bf83b9d7cd1a0073e Author: Kevin Rathbun <43969518+kevinrr...@users.noreply.github.com> AuthorDate: Tue Jun 4 11:27:45 2024 -0400 FateOpsCommandsIT enhancements (#4400) - Added test to verify name and step of transactions using summary and print commands - Added check to filter by non-existent FateId in summary and print command tests - Some other small misc improvemements * Misc Changes: - Changed result.contains("0 transactions") to result.contains(" 0 transactions") to avoid returning true if the number of transactions is not 0 but ends in 0 - Added shutting down the compactor to BeforeEach (this also has the effect of adding this shutdown to testTransactionNameAndStep() which I mistakenly thought shouldn't shutdown the compactor) - Simplified testTransactionNameAndStep() (unneccessary config and checks). Also added deleting the table at the end of the test. --- .../server/util/fateCommand/FateTxnDetails.java | 27 ++- .../accumulo/test/fate/FateOpsCommandsIT.java | 244 +++++++++++++++------ 2 files changed, 201 insertions(+), 70 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java index 66c12f81cd..0245b07c4d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java @@ -20,6 +20,7 @@ package org.apache.accumulo.server.util.fateCommand; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -74,8 +75,10 @@ public class FateTxnDetails implements Comparable<FateTxnDetails> { if (txnStatus.getFateId() != null) { fateId = txnStatus.getFateId().canonical(); } - locksHeld = formatLockInfo(txnStatus.getHeldLocks(), idsToNameMap); - locksWaiting = formatLockInfo(txnStatus.getWaitingLocks(), idsToNameMap); + locksHeld = + Collections.unmodifiableList(formatLockInfo(txnStatus.getHeldLocks(), idsToNameMap)); + locksWaiting = + Collections.unmodifiableList(formatLockInfo(txnStatus.getWaitingLocks(), idsToNameMap)); } private List<String> formatLockInfo(final List<String> lockInfo, @@ -92,6 +95,18 @@ public class FateTxnDetails implements Comparable<FateTxnDetails> { return formattedLocks; } + public long getRunning() { + return running; + } + + public String getTxName() { + return txName; + } + + public String getStep() { + return step; + } + public String getFateId() { return fateId; } @@ -100,6 +115,14 @@ public class FateTxnDetails implements Comparable<FateTxnDetails> { return status; } + public List<String> getLocksHeld() { + return locksHeld; + } + + public List<String> getLocksWaiting() { + return locksWaiting; + } + /** * Sort by running time in reverse (oldest txn first). txid is unique as used to break times and * so that compareTo remains consistent with hashCode and equals methods. diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index 5bddd4f35c..f448149d76 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -28,12 +28,20 @@ import static org.junit.jupiter.api.Assertions.fail; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.Fate; @@ -41,15 +49,20 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.Admin; import org.apache.accumulo.server.util.fateCommand.FateSummaryReport; +import org.apache.accumulo.server.util.fateCommand.FateTxnDetails; import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.functional.ReadWriteIT; +import org.apache.accumulo.test.functional.SlowIterator; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public abstract class FateOpsCommandsIT extends ConfigurableMacBase @@ -66,6 +79,15 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s"); } + @BeforeEach + public void shutdownCompactor() throws Exception { + // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was + // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes + // this issue. + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); + } + @Test public void testFateSummaryCommand() throws Exception { executeTest(this::testFateSummaryCommand); @@ -75,15 +97,9 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase throws Exception { // Configure Fate Fate<TestEnv> fate = initializeFate(store); - // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was - // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes - // this issue. - getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); // validate blank report, no transactions have started - ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", "NEW", "-s", - "IN_PROGRESS", "-s", "FAILED"); + ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); assertEquals(0, p.getProcess().waitFor()); String result = p.readStdOut(); result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); @@ -93,10 +109,10 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase assertTrue(report.getStatusCounts().isEmpty()); assertTrue(report.getStepCounts().isEmpty()); assertTrue(report.getCmdCounts().isEmpty()); - assertEquals(Set.of("FAILED", "IN_PROGRESS", "NEW"), report.getStatusFilterNames()); + assertTrue(report.getStatusFilterNames().isEmpty()); assertTrue(report.getInstanceTypesFilterNames().isEmpty()); assertTrue(report.getFateIdFilter().isEmpty()); - assertEquals(0, report.getFateDetails().size()); + validateFateDetails(report.getFateDetails(), 0, null); // create Fate transactions FateId fateId1 = fate.startTransaction(); @@ -117,12 +133,11 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase assertTrue(report.getStatusFilterNames().isEmpty()); assertTrue(report.getInstanceTypesFilterNames().isEmpty()); assertTrue(report.getFateIdFilter().isEmpty()); - assertEquals(2, report.getFateDetails().size()); - ArrayList<String> fateIdsFromResult1 = new ArrayList<>(); - report.getFateDetails().forEach((d) -> { - fateIdsFromResult1.add(d.getFateId()); - }); - assertTrue(fateIdsFromResult1.containsAll(fateIdsStarted)); + validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); + + /* + * Test filtering by FateIds + */ // validate filtering by both transactions p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), fateId2.canonical(), @@ -140,12 +155,7 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase assertTrue(report.getInstanceTypesFilterNames().isEmpty()); assertEquals(2, report.getFateIdFilter().size()); assertTrue(report.getFateIdFilter().containsAll(fateIdsStarted)); - assertEquals(2, report.getFateDetails().size()); - ArrayList<String> fateIdsFromResult2 = new ArrayList<>(); - report.getFateDetails().forEach((d) -> { - fateIdsFromResult2.add(d.getFateId()); - }); - assertTrue(fateIdsFromResult2.containsAll(fateIdsStarted)); + validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); // validate filtering by just one transaction p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--summary", "-j"); @@ -162,12 +172,29 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase assertTrue(report.getInstanceTypesFilterNames().isEmpty()); assertEquals(1, report.getFateIdFilter().size()); assertTrue(report.getFateIdFilter().contains(fateId1.canonical())); - assertEquals(1, report.getFateDetails().size()); - ArrayList<String> fateIdsFromResult3 = new ArrayList<>(); - report.getFateDetails().forEach((d) -> { - fateIdsFromResult3.add(d.getFateId()); - }); - assertTrue(fateIdsFromResult3.contains(fateId1.canonical())); + validateFateDetails(report.getFateDetails(), 1, fateIdsStarted); + + // validate filtering by non-existent transaction + FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID()); + p = getCluster().exec(Admin.class, "fate", fakeFateId.canonical(), "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertEquals(1, report.getFateIdFilter().size()); + assertTrue(report.getFateIdFilter().contains(fakeFateId.canonical())); + validateFateDetails(report.getFateDetails(), 0, fateIdsStarted); + + /* + * Test filtering by States + */ // validate status filter by including only FAILED transactions, should be none p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", "FAILED"); @@ -183,7 +210,27 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase assertEquals(Set.of("FAILED"), report.getStatusFilterNames()); assertTrue(report.getInstanceTypesFilterNames().isEmpty()); assertTrue(report.getFateIdFilter().isEmpty()); - assertEquals(0, report.getFateDetails().size()); + validateFateDetails(report.getFateDetails(), 0, fateIdsStarted); + + // validate status filter by including only NEW transactions, should be 2 + p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", "NEW"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertEquals(Set.of("NEW"), report.getStatusFilterNames()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertTrue(report.getFateIdFilter().isEmpty()); + validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); + + /* + * Test filtering by FateInstanceType + */ // validate FateInstanceType filter by only including transactions with META filter p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-t", "META"); @@ -200,14 +247,9 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase assertEquals(Set.of("META"), report.getInstanceTypesFilterNames()); assertTrue(report.getFateIdFilter().isEmpty()); if (store.type() == FateInstanceType.META) { - assertEquals(2, report.getFateDetails().size()); - ArrayList<String> fateIdsFromResult4 = new ArrayList<>(); - report.getFateDetails().forEach((d) -> { - fateIdsFromResult4.add(d.getFateId()); - }); - assertTrue(fateIdsFromResult4.containsAll(fateIdsStarted)); + validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); } else { // USER - assertEquals(0, report.getFateDetails().size()); + validateFateDetails(report.getFateDetails(), 0, fateIdsStarted); } // validate FateInstanceType filter by only including transactions with USER filter @@ -225,14 +267,9 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase assertEquals(Set.of("USER"), report.getInstanceTypesFilterNames()); assertTrue(report.getFateIdFilter().isEmpty()); if (store.type() == FateInstanceType.META) { - assertEquals(0, report.getFateDetails().size()); + validateFateDetails(report.getFateDetails(), 0, fateIdsStarted); } else { // USER - assertEquals(2, report.getFateDetails().size()); - ArrayList<String> fateIdsFromResult4 = new ArrayList<>(); - report.getFateDetails().forEach((d) -> { - fateIdsFromResult4.add(d.getFateId()); - }); - assertTrue(fateIdsFromResult4.containsAll(fateIdsStarted)); + validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); } fate.shutdown(10, TimeUnit.MINUTES); @@ -247,11 +284,6 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase throws Exception { // Configure Fate Fate<TestEnv> fate = initializeFate(store); - // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was - // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes - // this issue. - getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -280,17 +312,12 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase throws Exception { // Configure Fate Fate<TestEnv> fate = initializeFate(store); - // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was - // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes - // this issue. - getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); // validate no transactions ProcessInfo p = getCluster().exec(Admin.class, "fate", "--print"); assertEquals(0, p.getProcess().waitFor()); String result = p.readStdOut(); - assertTrue(result.contains("0 transactions")); + assertTrue(result.contains(" 0 transactions")); // create Fate transactions FateId fateId1 = fate.startTransaction(); @@ -339,6 +366,14 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase fateIdsFromResult = getFateIdsFromPrint(result); assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromResult); + // Filter by non-existent FateId + FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID()); + p = getCluster().exec(Admin.class, "fate", fakeFateId.canonical(), "--print"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + assertEquals(0, fateIdsFromResult.size()); + /* * Test filtering by FateInstanceType */ @@ -370,6 +405,73 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase fate.shutdown(10, TimeUnit.MINUTES); } + @Test + public void testTransactionNameAndStep() throws Exception { + executeTest(this::testTransactionNameAndStep); + } + + protected void testTransactionNameAndStep(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + // Since the other tests just use NEW transactions for simplicity, there are some fields of the + // summary and print outputs which are null and not tested for (transaction name and transaction + // step). This test uses seeded/in progress transactions to test that the summary and print + // commands properly output these fields. + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + final String table = getUniqueNames(1)[0]; + + IteratorSetting is = new IteratorSetting(1, SlowIterator.class); + is.addOption("sleepTime", "10000"); + + NewTableConfiguration cfg = new NewTableConfiguration(); + cfg.attachIterator(is, EnumSet.of(IteratorUtil.IteratorScope.majc)); + client.tableOperations().create(table, cfg); + + ReadWriteIT.ingest(client, 10, 10, 10, 0, table); + client.tableOperations().flush(table, null, null, true); + + // create 2 Fate transactions + client.tableOperations().compact(table, null, null, false, false); + client.tableOperations().compact(table, null, null, false, false); + List<String> fateIdsStarted = new ArrayList<>(); + + ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + + String result = p.readStdOut(); + result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); + FateSummaryReport report = FateSummaryReport.fromJson(result); + + // Validate transaction name and transaction step from summary command + + for (FateTxnDetails d : report.getFateDetails()) { + assertEquals("TABLE_COMPACT", d.getTxName()); + assertEquals("CompactionDriver", d.getStep()); + fateIdsStarted.add(d.getFateId()); + } + assertEquals(2, fateIdsStarted.size()); + + p = getCluster().exec(Admin.class, "fate", "--print"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + + // Validate transaction name and transaction step from print command + + String[] lines = result.split("\n"); + // Filter out the result to just include the info about the transactions + List<String> transactionInfo = Arrays.stream(lines) + .filter( + line -> line.contains(fateIdsStarted.get(0)) || line.contains(fateIdsStarted.get(1))) + .collect(Collectors.toList()); + assertEquals(2, transactionInfo.size()); + for (String info : transactionInfo) { + assertTrue(info.contains("TABLE_COMPACT")); + assertTrue(info.contains("op: CompactionDriver")); + } + + client.tableOperations().delete(table); + } + } + @Test public void testFateCancelCommand() throws Exception { executeTest(this::testFateCancelCommand); @@ -379,11 +481,6 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase throws Exception { // Configure Fate Fate<TestEnv> fate = initializeFate(store); - // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was - // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes - // this issue. - getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -417,11 +514,6 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase throws Exception { // Configure Fate Fate<TestEnv> fate = initializeFate(store); - // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was - // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes - // this issue. - getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -467,11 +559,6 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase throws Exception { // Configure Fate Fate<TestEnv> fate = initializeFate(store); - // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was - // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes - // this issue. - getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -548,6 +635,27 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase return fateIdToStatus; } + /** + * Validates the fate details of NEW transactions + * + * @param details the fate details from the {@link FateSummaryReport} + * @param expDetailsSize the expected size of details + * @param fateIdsStarted the list of fate ids that have been started + */ + private void validateFateDetails(Set<FateTxnDetails> details, int expDetailsSize, + List<String> fateIdsStarted) { + assertEquals(expDetailsSize, details.size()); + for (FateTxnDetails d : details) { + assertTrue(fateIdsStarted.contains(d.getFateId())); + assertEquals("NEW", d.getStatus()); + assertEquals("?", d.getStep()); + assertEquals("?", d.getTxName()); + assertNotEquals(0, d.getRunning()); + assertEquals("[]", d.getLocksHeld().toString()); + assertEquals("[]", d.getLocksWaiting().toString()); + } + } + private Fate<TestEnv> initializeFate(FateStore<TestEnv> store) { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2");