This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 66d4192d7d Fixes MetaFateII (#6269)
66d4192d7d is described below
commit 66d4192d7dd0ce0b63d9d51e0f6a798b805a3d08
Author: Keith Turner <[email protected]>
AuthorDate: Fri Mar 27 14:50:44 2026 -0700
Fixes MetaFateII (#6269)
Some test methods were not shutting fate down which left its thread
running and would interfere with other test.
---
.../compaction/ExternalCompactionTestUtils.java | 7 -
.../org/apache/accumulo/test/fate/FateITBase.java | 217 +++++++++++----------
2 files changed, 113 insertions(+), 111 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index 07d3ea60fb..778ca66f8c 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -30,7 +30,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -75,10 +74,8 @@ import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
-import org.apache.thrift.transport.TTransportException;
import com.beust.jcommander.internal.Maps;
-import com.google.common.net.HostAndPort;
public class ExternalCompactionTestUtils {
@@ -306,10 +303,6 @@ public class ExternalCompactionTestUtils {
public static int confirmCompactionRunning(ServerContext ctx,
Set<ExternalCompactionId> ecids)
throws Exception {
int matches = 0;
- Optional<HostAndPort> coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(ctx);
- if (coordinatorHost.isEmpty()) {
- throw new TTransportException("Unable to get CompactionCoordinator
address from ZooKeeper");
- }
while (matches == 0) {
var running = ExternalCompactionTestUtils.getRunningCompactions(ctx);
for (ExternalCompactionId ecid : ecids) {
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
index 1411e258cc..1f79505ff2 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java
@@ -580,58 +580,63 @@ public abstract class FateITBase extends
SharedMiniClusterBase implements FateTe
throws Exception {
Fate<TestEnv> fate = initializeFate(store);
- // Wait for the transaction runner to be scheduled.
- UtilWaitThread.sleep(3000);
+ try {
+ // Wait for the transaction runner to be scheduled.
+ UtilWaitThread.sleep(3000);
- callStarted = new CountDownLatch(1);
- finishCall = new CountDownLatch(1);
+ callStarted = new CountDownLatch(1);
+ finishCall = new CountDownLatch(1);
- FateId txid = fate.startTransaction();
- assertEquals(TStatus.NEW, getTxStatus(sctx, txid));
+ FateId txid = fate.startTransaction();
+ assertEquals(TStatus.NEW, getTxStatus(sctx, txid));
- fate.seedTransaction(TEST_FATE_OP, txid, new
TestRepo("testShutdownDoesNotFailTx"), true,
- "Test Op");
+ fate.seedTransaction(TEST_FATE_OP, txid, new
TestRepo("testShutdownDoesNotFailTx"), true,
+ "Test Op");
+
+ // The Fate operation could be in a SUBMITTED state if the
+ // Fate transaction runner thread has not picked it up yet.
+ // If the thread has picked it up, then it will be in an
+ // IN_PROGRESS state, but will be waiting on the finishCall
+ // latch to be called to continue.
+ assertTrue(TStatus.SUBMITTED == getTxStatus(sctx, txid)
+ || TStatus.IN_PROGRESS == getTxStatus(sctx, txid));
+
+ // wait for call() to be called
+ callStarted.await();
+ assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
+
+ // shutdown fate
+ fate.shutdown(0, SECONDS);
+
+ // tell the op to exit the method
+ Wait.waitFor(() -> interruptedException.get() != null);
+ interruptedException.set(null);
+
+ // restart fate
+ assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
+ fate = initializeFate(store);
+ assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
+
+ // Restarting the transaction runners will retry the in-progress
+ // transaction. Reset the CountDownLatch's to confirm.
+ callStarted = new CountDownLatch(1);
+ finishCall = new CountDownLatch(1);
+
+ callStarted.await();
+ assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
+ finishCall.countDown();
+
+ // This should complete normally, cleaning up the tx and deleting it
from ZK
+ TStatus status = getTxStatus(sctx, txid);
+ while (status != TStatus.UNKNOWN) {
+ Thread.sleep(100);
+ status = getTxStatus(sctx, txid);
+ }
+ assertNull(interruptedException.get());
+ } finally {
+ fate.shutdown(10, TimeUnit.MINUTES);
+ }
- // The Fate operation could be in a SUBMITTED state if the
- // Fate transaction runner thread has not picked it up yet.
- // If the thread has picked it up, then it will be in an
- // IN_PROGRESS state, but will be waiting on the finishCall
- // latch to be called to continue.
- assertTrue(TStatus.SUBMITTED == getTxStatus(sctx, txid)
- || TStatus.IN_PROGRESS == getTxStatus(sctx, txid));
-
- // wait for call() to be called
- callStarted.await();
- assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
-
- // shutdown fate
- fate.shutdown(0, SECONDS);
-
- // tell the op to exit the method
- Wait.waitFor(() -> interruptedException.get() != null);
- interruptedException.set(null);
-
- // restart fate
- assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
- fate = initializeFate(store);
- assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
-
- // Restarting the transaction runners will retry the in-progress
- // transaction. Reset the CountDownLatch's to confirm.
- callStarted = new CountDownLatch(1);
- finishCall = new CountDownLatch(1);
-
- callStarted.await();
- assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
- finishCall.countDown();
-
- // This should complete normally, cleaning up the tx and deleting it from
ZK
- TStatus status = getTxStatus(sctx, txid);
- while (status != TStatus.UNKNOWN) {
- Thread.sleep(100);
- status = getTxStatus(sctx, txid);
- }
- assertNull(interruptedException.get());
}
public static class DoNothingRepo implements Repo<TestEnv> {
@@ -673,78 +678,82 @@ public abstract class FateITBase extends
SharedMiniClusterBase implements FateTe
// This test ensures that fate only processes fateids that fall within its
assigned partitions
// of fateids.
Fate<TestEnv> fate = initializeFate(store);
- fate.setPartitions(Set.of());
+ try {
+ fate.setPartitions(Set.of());
- Set<FateId> fateIds = new HashSet<>();
+ Set<FateId> fateIds = new HashSet<>();
- for (int i = 0; i < 100; i++) {
- var txid = fate.startTransaction();
- fateIds.add(txid);
+ for (int i = 0; i < 100; i++) {
+ var txid = fate.startTransaction();
+ fateIds.add(txid);
- fate.seedTransaction(TEST_FATE_OP, txid, new DoNothingRepo(), false, "no
goal");
- }
+ fate.seedTransaction(TEST_FATE_OP, txid, new DoNothingRepo(), false,
"no goal");
+ }
- for (var fateId : fateIds) {
- assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
- }
+ for (var fateId : fateIds) {
+ assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+ }
- // start processing all uuids that start with 1 or 5, but no other ids
- fate.setPartitions(Set.of(newPartition(store.type(), "1"),
newPartition(store.type(), "5")));
+ // start processing all uuids that start with 1 or 5, but no other ids
+ fate.setPartitions(Set.of(newPartition(store.type(), "1"),
newPartition(store.type(), "5")));
- Wait.waitFor(() -> fateIds.stream().filter(
- fateId -> fateId.getTxUUIDStr().startsWith("1") ||
fateId.getTxUUIDStr().startsWith("5"))
- .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status ==
SUCCESSFUL));
+ Wait.waitFor(() -> fateIds.stream().filter(
+ fateId -> fateId.getTxUUIDStr().startsWith("1") ||
fateId.getTxUUIDStr().startsWith("5"))
+ .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status
== SUCCESSFUL));
- for (var fateId : fateIds) {
- var uuid = fateId.getTxUUIDStr();
- if (uuid.startsWith("1") || uuid.startsWith("5")) {
- assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
- } else {
- assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+ for (var fateId : fateIds) {
+ var uuid = fateId.getTxUUIDStr();
+ if (uuid.startsWith("1") || uuid.startsWith("5")) {
+ assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
+ } else {
+ assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+ }
}
- }
- // start processing uuids that start with e
- fate.setPartitions(Set.of(newPartition(store.type(), "e")));
- Wait.waitFor(() -> fateIds.stream().filter(fateId ->
fateId.getTxUUIDStr().startsWith("e"))
- .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status ==
SUCCESSFUL));
+ // start processing uuids that start with e
+ fate.setPartitions(Set.of(newPartition(store.type(), "e")));
+ Wait.waitFor(() -> fateIds.stream().filter(fateId ->
fateId.getTxUUIDStr().startsWith("e"))
+ .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status
== SUCCESSFUL));
- for (var fateId : fateIds) {
- var uuid = fateId.getTxUUIDStr();
- if (uuid.startsWith("1") || uuid.startsWith("5") ||
uuid.startsWith("e")) {
- assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
- } else {
- assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+ for (var fateId : fateIds) {
+ var uuid = fateId.getTxUUIDStr();
+ if (uuid.startsWith("1") || uuid.startsWith("5") ||
uuid.startsWith("e")) {
+ assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
+ } else {
+ assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+ }
}
- }
- // add new ids to ensure that uuid prefixes 1 and 5 are no longer processed
- Set<FateId> fateIds2 = new HashSet<>();
+ // add new ids to ensure that uuid prefixes 1 and 5 are no longer
processed
+ Set<FateId> fateIds2 = new HashSet<>();
- for (int i = 0; i < 100; i++) {
- var txid = fate.startTransaction();
- fateIds2.add(txid);
- fate.seedTransaction(TEST_FATE_OP, txid, new DoNothingRepo(), false, "no
goal");
- }
- Wait.waitFor(() -> fateIds2.stream().filter(fateId ->
fateId.getTxUUIDStr().startsWith("e"))
- .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status ==
SUCCESSFUL));
- for (var fateId : fateIds2) {
- var uuid = fateId.getTxUUIDStr();
- if (uuid.startsWith("e")) {
- assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
- } else {
- assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+ for (int i = 0; i < 100; i++) {
+ var txid = fate.startTransaction();
+ fateIds2.add(txid);
+ fate.seedTransaction(TEST_FATE_OP, txid, new DoNothingRepo(), false,
"no goal");
+ }
+ Wait.waitFor(() -> fateIds2.stream().filter(fateId ->
fateId.getTxUUIDStr().startsWith("e"))
+ .map(fateId -> getTxStatus(sctx, fateId)).allMatch(status -> status
== SUCCESSFUL));
+ for (var fateId : fateIds2) {
+ var uuid = fateId.getTxUUIDStr();
+ if (uuid.startsWith("e")) {
+ assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
+ } else {
+ assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+ }
}
- }
- // nothing should have changed with the first set of ids
- for (var fateId : fateIds) {
- var uuid = fateId.getTxUUIDStr();
- if (uuid.startsWith("1") || uuid.startsWith("5") ||
uuid.startsWith("e")) {
- assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
- } else {
- assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+ // nothing should have changed with the first set of ids
+ for (var fateId : fateIds) {
+ var uuid = fateId.getTxUUIDStr();
+ if (uuid.startsWith("1") || uuid.startsWith("5") ||
uuid.startsWith("e")) {
+ assertEquals(SUCCESSFUL, getTxStatus(sctx, fateId));
+ } else {
+ assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
+ }
}
+ } finally {
+ fate.shutdown(10, TimeUnit.MINUTES);
}
}