This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 66d4192d7d Fixes MetaFateII (#6269)
66d4192d7d is described below

commit 66d4192d7dd0ce0b63d9d51e0f6a798b805a3d08
Author: Keith Turner <[email protected]>
AuthorDate: Fri Mar 27 14:50:44 2026 -0700

    Fixes MetaFateII (#6269)
    
    Some test methods were not shutting fate down which left its thread
    running and would interfere with other test.
---
 .../compaction/ExternalCompactionTestUtils.java    |   7 -
 .../org/apache/accumulo/test/fate/FateITBase.java  | 217 +++++++++++----------
 2 files changed, 113 insertions(+), 111 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index 07d3ea60fb..778ca66f8c 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -30,7 +30,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -75,10 +74,8 @@ import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
-import org.apache.thrift.transport.TTransportException;
 
 import com.beust.jcommander.internal.Maps;
-import com.google.common.net.HostAndPort;
 
 public class ExternalCompactionTestUtils {
 
@@ -306,10 +303,6 @@ public class ExternalCompactionTestUtils {
   public static int confirmCompactionRunning(ServerContext ctx, 
Set<ExternalCompactionId> ecids)
       throws Exception {
     int matches = 0;
-    Optional<HostAndPort> coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(ctx);
-    if (coordinatorHost.isEmpty()) {
-      throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
-    }
     while (matches == 0) {
       var running = ExternalCompactionTestUtils.getRunningCompactions(ctx);
       for (ExternalCompactionId ecid : ecids) {
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
index 1411e258cc..1f79505ff2 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
@@ -580,58 +580,63 @@ public abstract class FateITBase extends 
SharedMiniClusterBase implements FateTe
       throws Exception {
     Fate<TestEnv> fate = initializeFate(store);
 
-    // Wait for the transaction runner to be scheduled.
-    UtilWaitThread.sleep(3000);
+    try {
+      // Wait for the transaction runner to be scheduled.
+      UtilWaitThread.sleep(3000);
 
-    callStarted = new CountDownLatch(1);
-    finishCall = new CountDownLatch(1);
+      callStarted = new CountDownLatch(1);
+      finishCall = new CountDownLatch(1);
 
-    FateId txid = fate.startTransaction();
-    assertEquals(TStatus.NEW, getTxStatus(sctx, txid));
+      FateId txid = fate.startTransaction();
+      assertEquals(TStatus.NEW, getTxStatus(sctx, txid));
 
-    fate.seedTransaction(TEST_FATE_OP, txid, new 
TestRepo("testShutdownDoesNotFailTx"), true,
-        "Test Op");
+      fate.seedTransaction(TEST_FATE_OP, txid, new 
TestRepo("testShutdownDoesNotFailTx"), true,
+          "Test Op");
+
+      // The Fate operation could be in a SUBMITTED state if the
+      // Fate transaction runner thread has not picked it up yet.
+      // If the thread has picked it up, then it will be in an
+      // IN_PROGRESS state, but will be waiting on the finishCall
+      // latch to be called to continue.
+      assertTrue(TStatus.SUBMITTED == getTxStatus(sctx, txid)
+          || TStatus.IN_PROGRESS == getTxStatus(sctx, txid));
+
+      // wait for call() to be called
+      callStarted.await();
+      assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
+
+      // shutdown fate
+      fate.shutdown(0, SECONDS);
+
+      // tell the op to exit the method
+      Wait.waitFor(() -> interruptedException.get() != null);
+      interruptedException.set(null);
+
+      // restart fate
+      assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
+      fate = initializeFate(store);
+      assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
+
+      // Restarting the transaction runners will retry the in-progress
+      // transaction. Reset the CountDownLatch's to confirm.
+      callStarted = new CountDownLatch(1);
+      finishCall = new CountDownLatch(1);
+
+      callStarted.await();
+      assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
+      finishCall.countDown();
+
+      // This should complete normally, cleaning up the tx and deleting it 
from ZK
+      TStatus status = getTxStatus(sctx, txid);
+      while (status != TStatus.UNKNOWN) {
+        Thread.sleep(100);
+        status = getTxStatus(sctx, txid);
+      }
+      assertNull(interruptedException.get());
+    } finally {
+      fate.shutdown(10, TimeUnit.MINUTES);
+    }
 
-    // The Fate operation could be in a SUBMITTED state if the
-    // Fate transaction runner thread has not picked it up yet.
-    // If the thread has picked it up, then it will be in an
-    // IN_PROGRESS state, but will be waiting on the finishCall
-    // latch to be called to continue.
-    assertTrue(TStatus.SUBMITTED == getTxStatus(sctx, txid)
-        || TStatus.IN_PROGRESS == getTxStatus(sctx, txid));
-
-    // wait for call() to be called
-    callStarted.await();
-    assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
-
-    // shutdown fate
-    fate.shutdown(0, SECONDS);
-
-    // tell the op to exit the method
-    Wait.waitFor(() -> interruptedException.get() != null);
-    interruptedException.set(null);
-
-    // restart fate
-    assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
-    fate = initializeFate(store);
-    assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
-
-    // Restarting the transaction runners will retry the in-progress
-    // transaction. Reset the CountDownLatch's to confirm.
-    callStarted = new CountDownLatch(1);
-    finishCall = new CountDownLatch(1);
-
-    callStarted.await();
-    assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
-    finishCall.countDown();
-
-    // This should complete normally, cleaning up the tx and deleting it from 
ZK
-    TStatus status = getTxStatus(sctx, txid);
-    while (status != TStatus.UNKNOWN) {
-      Thread.sleep(100);
-      status = getTxStatus(sctx, txid);
-    }
-    assertNull(interruptedException.get());
   }
 
   public static class DoNothingRepo implements Repo<TestEnv> {
@@ -673,78 +678,82 @@ public abstract class FateITBase extends 
SharedMiniClusterBase implements FateTe
     // This test ensures that fate only processes fateids that fall within its 
assigned partitions
     // of fateids.
     Fate<TestEnv> fate = initializeFate(store);
-    fate.setPartitions(Set.of());
+    try {
+      fate.setPartitions(Set.of());
 
-    Set<FateId> fateIds = new HashSet<>();
+      Set<FateId> fateIds = new HashSet<>();
 
-    for (int i = 0; i < 100; i++) {
-      var txid = fate.startTransaction();
-      fateIds.add(txid);
+      for (int i = 0; i < 100; i++) {
+        var txid = fate.startTransaction();
+        fateIds.add(txid);
 
-      fate.seedTransaction(TEST_FATE_OP, txid, new DoNothingRepo(), false, "no 
goal");
-    }
+        fate.seedTransaction(TEST_FATE_OP, txid, new DoNothingRepo(), false, 
"no goal");
+      }
 
-    for (var fateId : fateIds) {
-      assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
-    }
+      for (var fateId : fateIds) {
+        assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+      }
 
-    // start processing all uuids that start with 1 or 5, but no other ids
-    fate.setPartitions(Set.of(newPartition(store.type(), "1"), 
newPartition(store.type(), "5")));
+      // start processing all uuids that start with 1 or 5, but no other ids
+      fate.setPartitions(Set.of(newPartition(store.type(), "1"), 
newPartition(store.type(), "5")));
 
-    Wait.waitFor(() -> fateIds.stream().filter(
-        fateId -> fateId.getTxUUIDStr().startsWith("1") || 
fateId.getTxUUIDStr().startsWith("5"))
-        .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status == 
SUCCESSFUL));
+      Wait.waitFor(() -> fateIds.stream().filter(
+          fateId -> fateId.getTxUUIDStr().startsWith("1") || 
fateId.getTxUUIDStr().startsWith("5"))
+          .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status 
== SUCCESSFUL));
 
-    for (var fateId : fateIds) {
-      var uuid = fateId.getTxUUIDStr();
-      if (uuid.startsWith("1") || uuid.startsWith("5")) {
-        assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
-      } else {
-        assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+      for (var fateId : fateIds) {
+        var uuid = fateId.getTxUUIDStr();
+        if (uuid.startsWith("1") || uuid.startsWith("5")) {
+          assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
+        } else {
+          assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+        }
       }
-    }
 
-    // start processing uuids that start with e
-    fate.setPartitions(Set.of(newPartition(store.type(), "e")));
-    Wait.waitFor(() -> fateIds.stream().filter(fateId -> 
fateId.getTxUUIDStr().startsWith("e"))
-        .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status == 
SUCCESSFUL));
+      // start processing uuids that start with e
+      fate.setPartitions(Set.of(newPartition(store.type(), "e")));
+      Wait.waitFor(() -> fateIds.stream().filter(fateId -> 
fateId.getTxUUIDStr().startsWith("e"))
+          .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status 
== SUCCESSFUL));
 
-    for (var fateId : fateIds) {
-      var uuid = fateId.getTxUUIDStr();
-      if (uuid.startsWith("1") || uuid.startsWith("5") || 
uuid.startsWith("e")) {
-        assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
-      } else {
-        assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+      for (var fateId : fateIds) {
+        var uuid = fateId.getTxUUIDStr();
+        if (uuid.startsWith("1") || uuid.startsWith("5") || 
uuid.startsWith("e")) {
+          assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
+        } else {
+          assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+        }
       }
-    }
 
-    // add new ids to ensure that uuid prefixes 1 and 5 are no longer processed
-    Set<FateId> fateIds2 = new HashSet<>();
+      // add new ids to ensure that uuid prefixes 1 and 5 are no longer 
processed
+      Set<FateId> fateIds2 = new HashSet<>();
 
-    for (int i = 0; i < 100; i++) {
-      var txid = fate.startTransaction();
-      fateIds2.add(txid);
-      fate.seedTransaction(TEST_FATE_OP, txid, new DoNothingRepo(), false, "no 
goal");
-    }
-    Wait.waitFor(() -> fateIds2.stream().filter(fateId -> 
fateId.getTxUUIDStr().startsWith("e"))
-        .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status == 
SUCCESSFUL));
-    for (var fateId : fateIds2) {
-      var uuid = fateId.getTxUUIDStr();
-      if (uuid.startsWith("e")) {
-        assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
-      } else {
-        assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+      for (int i = 0; i < 100; i++) {
+        var txid = fate.startTransaction();
+        fateIds2.add(txid);
+        fate.seedTransaction(TEST_FATE_OP, txid, new DoNothingRepo(), false, 
"no goal");
+      }
+      Wait.waitFor(() -> fateIds2.stream().filter(fateId -> 
fateId.getTxUUIDStr().startsWith("e"))
+          .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status 
== SUCCESSFUL));
+      for (var fateId : fateIds2) {
+        var uuid = fateId.getTxUUIDStr();
+        if (uuid.startsWith("e")) {
+          assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
+        } else {
+          assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+        }
       }
-    }
 
-    // nothing should have changed with the first set of ids
-    for (var fateId : fateIds) {
-      var uuid = fateId.getTxUUIDStr();
-      if (uuid.startsWith("1") || uuid.startsWith("5") || 
uuid.startsWith("e")) {
-        assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
-      } else {
-        assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+      // nothing should have changed with the first set of ids
+      for (var fateId : fateIds) {
+        var uuid = fateId.getTxUUIDStr();
+        if (uuid.startsWith("1") || uuid.startsWith("5") || 
uuid.startsWith("e")) {
+          assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
+        } else {
+          assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+        }
       }
+    } finally {
+      fate.shutdown(10, TimeUnit.MINUTES);
     }
   }
 

Reply via email to