This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 4a6ecfe0f3 Fixes bug with FateInterleavingIT.testInterleaving (#5214) 4a6ecfe0f3 is described below commit 4a6ecfe0f34463039211311a2fa9872c65605ab9 Author: Kevin Rathbun <krath...@apache.org> AuthorDate: Mon Jan 27 10:06:35 2025 -0500 Fixes bug with FateInterleavingIT.testInterleaving (#5214) - Rewrote the test. Seemed to expect an interleave to occur at every opportunity when this may not always occur in 4.0. Now, just expect at least one interleave to occur. This still accomplishes the same goal of the test: test a fate thread interleaving work on multiple fate ids and ensure the order of operations for any given fate id is as expected even with interleaving. - Renamed FateInterleavingIT/UserFateInterleavingIT/MetaFateInterleavingIT to FateExecutionOrderIT to better indicate what is being tested. --- ...terleavingIT.java => FateExecutionOrderIT.java} | 157 ++++++++++++--------- ...eavingIT.java => MetaFateExecutionOrderIT.java} | 6 +- ...eavingIT.java => UserFateExecutionOrderIT.java} | 6 +- 3 files changed, 96 insertions(+), 73 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java similarity index 66% rename from test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java rename to test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java index b4bf414682..9c1234efd4 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java @@ -26,7 +26,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; @@ -54,7 +57,6 @@ import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.test.fate.FateTestRunner.TestEnv; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -63,13 +65,13 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.Iterators; -public abstract class FateInterleavingIT extends SharedMiniClusterBase - implements FateTestRunner<FateInterleavingIT.FilTestEnv> { +public abstract class FateExecutionOrderIT extends SharedMiniClusterBase + implements FateTestRunner<FateExecutionOrderIT.FeoTestEnv> { - public static class FilTestEnv extends TestEnv { + public static class FeoTestEnv extends TestEnv { private final AccumuloClient client; - public FilTestEnv(AccumuloClient client) { + public FeoTestEnv(AccumuloClient client) { this.client = client; } @@ -78,11 +80,11 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase } } - public static class FirstOp implements Repo<FateInterleavingIT.FilTestEnv> { + public static class FirstOp implements Repo<FateExecutionOrderIT.FeoTestEnv> { private static final long serialVersionUID = 1L; - protected boolean isTrackingDataSet(FateId tid, FilTestEnv env, String step) throws Exception { + protected boolean isTrackingDataSet(FateId tid, FeoTestEnv env, String step) throws Exception { try (Scanner scanner = env.getClient().createScanner(FATE_TRACKING_TABLE)) { return scanner.stream() .anyMatch(e -> e.getKey().getColumnFamily().toString().equals(tid.canonical()) @@ -90,7 +92,7 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase } } - protected static void insertTrackingData(FateId tid, FilTestEnv env, String step) + protected static void insertTrackingData(FateId tid, FeoTestEnv env, String step) throws TableNotFoundException, MutationsRejectedException { try (BatchWriter bw = env.getClient().createBatchWriter(FATE_TRACKING_TABLE)) { Mutation mut = new Mutation(Long.toString(System.currentTimeMillis())); @@ -100,13 +102,18 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase } @Override - public long isReady(FateId tid, FilTestEnv env) throws Exception { + public long isReady(FateId tid, FeoTestEnv env) throws Exception { + // First call to isReady will return that it's not ready (defer time of 100ms), inserting + // the data 'isReady1' so we know isReady was called once. The second attempt (after the + // deferral time) will pass as ready (return 0) and insert the data 'isReady2' so we know + // the second call to isReady was made Thread.sleep(50); var step = this.getName() + "::isReady"; - if (isTrackingDataSet(tid, env, step)) { + if (isTrackingDataSet(tid, env, step + "1")) { + insertTrackingData(tid, env, step + "2"); return 0; } else { - insertTrackingData(tid, env, step); + insertTrackingData(tid, env, step + "1"); return 100; } } @@ -117,14 +124,14 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase } @Override - public Repo<FilTestEnv> call(FateId tid, FilTestEnv env) throws Exception { + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv env) throws Exception { Thread.sleep(50); insertTrackingData(tid, env, this.getName() + "::call"); return new SecondOp(); } @Override - public void undo(FateId fateId, FilTestEnv environment) throws Exception { + public void undo(FateId fateId, FeoTestEnv environment) throws Exception { throw new UnsupportedOperationException(); } @@ -138,7 +145,7 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase private static final long serialVersionUID = 1L; @Override - public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception { + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { super.call(tid, environment); return new LastOp(); } @@ -149,7 +156,7 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase private static final long serialVersionUID = 1L; @Override - public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception { + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { super.call(tid, environment); return null; } @@ -179,22 +186,22 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase } } - private void waitFor(FateStore<FilTestEnv> store, FateId txid) throws Exception { + private void waitFor(FateStore<FeoTestEnv> store, FateId txid) throws Exception { while (store.read(txid).getStatus() != SUCCESSFUL) { Thread.sleep(50); } } - protected Fate<FilTestEnv> initializeFate(AccumuloClient client, FateStore<FilTestEnv> store) { + protected Fate<FeoTestEnv> initializeFate(AccumuloClient client, FateStore<FeoTestEnv> store) { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - return new Fate<>(new FilTestEnv(client), store, false, r -> r + "", config); + return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "", config); } - private static Entry<String,String> toIdStep(Entry<Key,Value> e) { - return new AbstractMap.SimpleImmutableEntry<>(e.getKey().getColumnFamily().toString(), - e.getValue().toString()); + private static Entry<FateId,String> toIdStep(Entry<Key,Value> e) { + return new AbstractMap.SimpleImmutableEntry<>( + FateId.from(e.getKey().getColumnFamily().toString()), e.getValue().toString()); } @Test @@ -202,15 +209,19 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase executeTest(this::testInterleaving); } - protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx) + protected void testInterleaving(FateStore<FeoTestEnv> store, ServerContext sctx) throws Exception { - // This test verifies that fates will interleave in time when their isReady() returns >0 and - // then 0. + // This test verifies that FATE will interleave at least once between fate operations when + // their isReady() returns > 0. Interleaving is not guaranteed, so we just check for one + // occurrence which is highly unlikely to fail unless something is broken with FATE. + // This test also ensures that the expected order of operations occurs per fate op. + // Interleaving should have no effect on this. - FateId[] fateIds = new FateId[3]; + final int numFateIds = 3; + FateId[] fateIds = new FateId[numFateIds]; - for (int i = 0; i < 3; i++) { + for (int i = 0; i < numFateIds; i++) { fateIds[i] = store.create(); var txStore = store.reserve(fateIds[i]); try { @@ -222,7 +233,7 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase } } - Fate<FilTestEnv> fate = null; + Fate<FeoTestEnv> fate = null; // The execution order of the transactions is not according to their insertion // order. However, we do know that the first step of each transaction will be @@ -235,38 +246,48 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase waitFor(store, fateId); } - var expectedIds = - Set.of(fateIds[0].canonical(), fateIds[1].canonical(), fateIds[2].canonical()); - Scanner scanner = client.createScanner(FATE_TRACKING_TABLE); - Iterator<Entry<String,String>> iter = scanner.stream().map(FateInterleavingIT::toIdStep) - .filter(e -> e.getValue().contains("::call")).iterator(); - - SortedMap<String,String> subset = new TreeMap<>(); - - Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), e.getValue())); - - // Should see the call() for the first steps of all three fates come first in time - assertTrue(subset.values().stream().allMatch(v -> v.startsWith("FirstOp"))); - assertEquals(expectedIds, subset.keySet()); - - subset.clear(); - - Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), e.getValue())); - - // Should see the call() for the second steps of all three fates come second in time - assertTrue(subset.values().stream().allMatch(v -> v.startsWith("SecondOp"))); - assertEquals(expectedIds, subset.keySet()); - - subset.clear(); - - Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), e.getValue())); - - // Should see the call() for the last steps of all three fates come last in time - assertTrue(subset.values().stream().allMatch(v -> v.startsWith("LastOp"))); - assertEquals(expectedIds, subset.keySet()); + var iter = scanner.stream().map(FateExecutionOrderIT::toIdStep).iterator(); + + // we should see the following execution order for all fate ids: + // FirstOp::isReady1, FirstOp::isReady2, FirstOp::call, + // SecondOp::isReady1, SecondOp::isReady2, SecondOp::call, + // LastOp::isReady1, LastOp::isReady2, LastOp::call + // the first isReady of each op will defer the op to be executed later, allowing for the FATE + // thread to interleave and work on another fate id, but may not always interleave. + // It is unlikely that the FATE will not interleave at least once in a run, so we will check + // for at least one occurrence. + int interleaves = 0; + int i = 0; + Map.Entry<FateId,String> prevOp = null; + var expRunOrder = List.of("FirstOp::isReady1", "FirstOp::isReady2", "FirstOp::call", + "SecondOp::isReady1", "SecondOp::isReady2", "SecondOp::call", "LastOp::isReady1", + "LastOp::isReady2", "LastOp::call"); + var fateIdsToExpRunOrder = Map.of(fateIds[0], new ArrayList<>(expRunOrder), fateIds[1], + new ArrayList<>(expRunOrder), fateIds[2], new ArrayList<>(expRunOrder)); + + while (iter.hasNext()) { + var currOp = iter.next(); + FateId fateId = currOp.getKey(); + String currStep = currOp.getValue(); + var expRunOrderFateId = fateIdsToExpRunOrder.get(fateId); + + boolean passedFirstStep = !currStep.equals(expRunOrder.get(0)); + boolean prevFateIdDiffered = prevOp != null && !prevOp.getKey().equals(fateId); + if (passedFirstStep && prevFateIdDiffered) { + interleaves++; + } + assertEquals(currStep, expRunOrderFateId.remove(0)); + prevOp = currOp; + i++; + } - assertFalse(iter.hasNext()); + assertTrue(interleaves > 0); + assertEquals(i, expRunOrder.size() * numFateIds); + assertEquals(numFateIds, fateIdsToExpRunOrder.size()); + for (var expRunOrderFateId : fateIdsToExpRunOrder.values()) { + assertTrue(expRunOrderFateId.isEmpty()); + } } finally { if (fate != null) { @@ -280,14 +301,14 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase private static final long serialVersionUID = 1L; @Override - public long isReady(FateId tid, FilTestEnv env) throws Exception { + public long isReady(FateId tid, FeoTestEnv env) throws Exception { Thread.sleep(50); insertTrackingData(tid, env, this.getName() + "::isReady"); return 0; } @Override - public Repo<FilTestEnv> call(FateId tid, FilTestEnv manager) throws Exception { + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv manager) throws Exception { Thread.sleep(50); insertTrackingData(tid, manager, this.getName() + "::call"); return new SecondNonInterleavingOp(); @@ -299,7 +320,7 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase private static final long serialVersionUID = 1L; @Override - public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception { + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { super.call(tid, environment); return new LastNonInterleavingOp(); } @@ -311,7 +332,7 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase private static final long serialVersionUID = 1L; @Override - public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception { + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { super.call(tid, environment); return null; } @@ -323,15 +344,16 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase executeTest(this::testNonInterleaving); } - protected void testNonInterleaving(FateStore<FilTestEnv> store, ServerContext sctx) + protected void testNonInterleaving(FateStore<FeoTestEnv> store, ServerContext sctx) throws Exception { // This test ensures that when isReady() always returns zero that all the fate steps will // execute immediately - FateId[] fateIds = new FateId[3]; + final int numFateIds = 3; + FateId[] fateIds = new FateId[numFateIds]; - for (int i = 0; i < 3; i++) { + for (int i = 0; i < numFateIds; i++) { fateIds[i] = store.create(); var txStore = store.reserve(fateIds[i]); try { @@ -343,7 +365,7 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase } } - Fate<FilTestEnv> fate = null; + Fate<FeoTestEnv> fate = null; // The execution order of the transactions is not according to their insertion // order. In this case, without interleaving, a transaction will run start to finish @@ -386,10 +408,11 @@ public abstract class FateInterleavingIT extends SharedMiniClusterBase Text fateId = subset.keySet().iterator().next().getColumnFamily(); assertTrue(subset.keySet().stream().allMatch(k -> k.getColumnFamily().equals(fateId))); - var expectedVals = Set.of("FirstNonInterleavingOp::isReady", "FirstNonInterleavingOp::call", + // list is used to ensure correct operations and correct order of operations + var expectedVals = List.of("FirstNonInterleavingOp::isReady", "FirstNonInterleavingOp::call", "SecondNonInterleavingOp::isReady", "SecondNonInterleavingOp::call", "LastNonInterleavingOp::isReady", "LastNonInterleavingOp::call"); - var actualVals = subset.values().stream().map(Value::toString).collect(Collectors.toSet()); + var actualVals = subset.values().stream().map(Value::toString).collect(Collectors.toList()); assertEquals(expectedVals, actualVals); return FateId.from(fateId.toString()); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT.java similarity index 90% rename from test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java rename to test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT.java index e24fc0af5b..99d14656ec 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT.java @@ -28,16 +28,16 @@ import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.test.fate.FateInterleavingIT; +import org.apache.accumulo.test.fate.FateExecutionOrderIT; -public class MetaFateInterleavingIT extends FateInterleavingIT { +public class MetaFateExecutionOrderIT extends FateExecutionOrderIT { // put the fate data for the test in a different location than what accumulo is using private static final InstanceId IID = InstanceId.of(UUID.randomUUID()); private static final String ZK_ROOT = ZooUtil.getRoot(IID); @Override - public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int maxDeferred, + public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { ServerContext sctx = getCluster().getServerContext(); String path = ZK_ROOT + Constants.ZFATE; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT.java similarity index 90% rename from test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java rename to test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT.java index 3a0aaeecd5..ec16596df6 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT.java @@ -25,11 +25,11 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; -import org.apache.accumulo.test.fate.FateInterleavingIT; +import org.apache.accumulo.test.fate.FateExecutionOrderIT; -public class UserFateInterleavingIT extends FateInterleavingIT { +public class UserFateExecutionOrderIT extends FateExecutionOrderIT { @Override - public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int maxDeferred, + public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { var table = getUniqueNames(1)[0]; try (ClientContext client =