This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 4a6ecfe0f3 Fixes bug with FateInterleavingIT.testInterleaving (#5214)
4a6ecfe0f3 is described below
commit 4a6ecfe0f34463039211311a2fa9872c65605ab9
Author: Kevin Rathbun <[email protected]>
AuthorDate: Mon Jan 27 10:06:35 2025 -0500
Fixes bug with FateInterleavingIT.testInterleaving (#5214)
- Rewrote the test. Seemed to expect an interleave to occur at every
opportunity when this may not always occur in 4.0. Now, just expect at least
one interleave to occur. This still accomplishes the same goal of the test:
test a fate thread interleaving work on multiple fate ids and ensure the order
of operations for any given fate id is as expected even with interleaving.
- Renamed FateInterleavingIT/UserFateInterleavingIT/MetaFateInterleavingIT
to
FateExecutionOrderIT to better indicate what is being tested.
---
...terleavingIT.java => FateExecutionOrderIT.java} | 157 ++++++++++++---------
...eavingIT.java => MetaFateExecutionOrderIT.java} | 6 +-
...eavingIT.java => UserFateExecutionOrderIT.java} | 6 +-
3 files changed, 96 insertions(+), 73 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
similarity index 66%
rename from
test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java
rename to
test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
index b4bf414682..9c1234efd4 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
@@ -26,7 +26,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.time.Duration;
import java.util.AbstractMap;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
@@ -54,7 +57,6 @@ import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.test.fate.FateTestRunner.TestEnv;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -63,13 +65,13 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.Iterators;
-public abstract class FateInterleavingIT extends SharedMiniClusterBase
- implements FateTestRunner<FateInterleavingIT.FilTestEnv> {
+public abstract class FateExecutionOrderIT extends SharedMiniClusterBase
+ implements FateTestRunner<FateExecutionOrderIT.FeoTestEnv> {
- public static class FilTestEnv extends TestEnv {
+ public static class FeoTestEnv extends TestEnv {
private final AccumuloClient client;
- public FilTestEnv(AccumuloClient client) {
+ public FeoTestEnv(AccumuloClient client) {
this.client = client;
}
@@ -78,11 +80,11 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
}
}
- public static class FirstOp implements Repo<FateInterleavingIT.FilTestEnv> {
+ public static class FirstOp implements Repo<FateExecutionOrderIT.FeoTestEnv>
{
private static final long serialVersionUID = 1L;
- protected boolean isTrackingDataSet(FateId tid, FilTestEnv env, String
step) throws Exception {
+ protected boolean isTrackingDataSet(FateId tid, FeoTestEnv env, String
step) throws Exception {
try (Scanner scanner =
env.getClient().createScanner(FATE_TRACKING_TABLE)) {
return scanner.stream()
.anyMatch(e ->
e.getKey().getColumnFamily().toString().equals(tid.canonical())
@@ -90,7 +92,7 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
}
}
- protected static void insertTrackingData(FateId tid, FilTestEnv env,
String step)
+ protected static void insertTrackingData(FateId tid, FeoTestEnv env,
String step)
throws TableNotFoundException, MutationsRejectedException {
try (BatchWriter bw =
env.getClient().createBatchWriter(FATE_TRACKING_TABLE)) {
Mutation mut = new Mutation(Long.toString(System.currentTimeMillis()));
@@ -100,13 +102,18 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
}
@Override
- public long isReady(FateId tid, FilTestEnv env) throws Exception {
+ public long isReady(FateId tid, FeoTestEnv env) throws Exception {
+ // First call to isReady will return that it's not ready (defer time of
100ms), inserting
+ // the data 'isReady1' so we know isReady was called once. The second
attempt (after the
+ // deferral time) will pass as ready (return 0) and insert the data
'isReady2' so we know
+ // the second call to isReady was made
Thread.sleep(50);
var step = this.getName() + "::isReady";
- if (isTrackingDataSet(tid, env, step)) {
+ if (isTrackingDataSet(tid, env, step + "1")) {
+ insertTrackingData(tid, env, step + "2");
return 0;
} else {
- insertTrackingData(tid, env, step);
+ insertTrackingData(tid, env, step + "1");
return 100;
}
}
@@ -117,14 +124,14 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
}
@Override
- public Repo<FilTestEnv> call(FateId tid, FilTestEnv env) throws Exception {
+ public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv env) throws Exception {
Thread.sleep(50);
insertTrackingData(tid, env, this.getName() + "::call");
return new SecondOp();
}
@Override
- public void undo(FateId fateId, FilTestEnv environment) throws Exception {
+ public void undo(FateId fateId, FeoTestEnv environment) throws Exception {
throw new UnsupportedOperationException();
}
@@ -138,7 +145,7 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
private static final long serialVersionUID = 1L;
@Override
- public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws
Exception {
+ public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws
Exception {
super.call(tid, environment);
return new LastOp();
}
@@ -149,7 +156,7 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
private static final long serialVersionUID = 1L;
@Override
- public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws
Exception {
+ public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws
Exception {
super.call(tid, environment);
return null;
}
@@ -179,22 +186,22 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
}
}
- private void waitFor(FateStore<FilTestEnv> store, FateId txid) throws
Exception {
+ private void waitFor(FateStore<FeoTestEnv> store, FateId txid) throws
Exception {
while (store.read(txid).getStatus() != SUCCESSFUL) {
Thread.sleep(50);
}
}
- protected Fate<FilTestEnv> initializeFate(AccumuloClient client,
FateStore<FilTestEnv> store) {
+ protected Fate<FeoTestEnv> initializeFate(AccumuloClient client,
FateStore<FeoTestEnv> store) {
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
- return new Fate<>(new FilTestEnv(client), store, false, r -> r + "",
config);
+ return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "",
config);
}
- private static Entry<String,String> toIdStep(Entry<Key,Value> e) {
- return new
AbstractMap.SimpleImmutableEntry<>(e.getKey().getColumnFamily().toString(),
- e.getValue().toString());
+ private static Entry<FateId,String> toIdStep(Entry<Key,Value> e) {
+ return new AbstractMap.SimpleImmutableEntry<>(
+ FateId.from(e.getKey().getColumnFamily().toString()),
e.getValue().toString());
}
@Test
@@ -202,15 +209,19 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
executeTest(this::testInterleaving);
}
- protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext
sctx)
+ protected void testInterleaving(FateStore<FeoTestEnv> store, ServerContext
sctx)
throws Exception {
- // This test verifies that fates will interleave in time when their
isReady() returns >0 and
- // then 0.
+ // This test verifies that FATE will interleave at least once between fate
operations when
+ // their isReady() returns > 0. Interleaving is not guaranteed, so we just
check for one
+ // occurrence which is highly unlikely to fail unless something is broken
with FATE.
+ // This test also ensures that the expected order of operations occurs per
fate op.
+ // Interleaving should have no effect on this.
- FateId[] fateIds = new FateId[3];
+ final int numFateIds = 3;
+ FateId[] fateIds = new FateId[numFateIds];
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < numFateIds; i++) {
fateIds[i] = store.create();
var txStore = store.reserve(fateIds[i]);
try {
@@ -222,7 +233,7 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
}
}
- Fate<FilTestEnv> fate = null;
+ Fate<FeoTestEnv> fate = null;
// The execution order of the transactions is not according to their
insertion
// order. However, we do know that the first step of each transaction will
be
@@ -235,38 +246,48 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
waitFor(store, fateId);
}
- var expectedIds =
- Set.of(fateIds[0].canonical(), fateIds[1].canonical(),
fateIds[2].canonical());
-
Scanner scanner = client.createScanner(FATE_TRACKING_TABLE);
- Iterator<Entry<String,String>> iter =
scanner.stream().map(FateInterleavingIT::toIdStep)
- .filter(e -> e.getValue().contains("::call")).iterator();
-
- SortedMap<String,String> subset = new TreeMap<>();
-
- Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(),
e.getValue()));
-
- // Should see the call() for the first steps of all three fates come
first in time
- assertTrue(subset.values().stream().allMatch(v ->
v.startsWith("FirstOp")));
- assertEquals(expectedIds, subset.keySet());
-
- subset.clear();
-
- Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(),
e.getValue()));
-
- // Should see the call() for the second steps of all three fates come
second in time
- assertTrue(subset.values().stream().allMatch(v ->
v.startsWith("SecondOp")));
- assertEquals(expectedIds, subset.keySet());
-
- subset.clear();
-
- Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(),
e.getValue()));
-
- // Should see the call() for the last steps of all three fates come last
in time
- assertTrue(subset.values().stream().allMatch(v ->
v.startsWith("LastOp")));
- assertEquals(expectedIds, subset.keySet());
+ var iter =
scanner.stream().map(FateExecutionOrderIT::toIdStep).iterator();
+
+ // we should see the following execution order for all fate ids:
+ // FirstOp::isReady1, FirstOp::isReady2, FirstOp::call,
+ // SecondOp::isReady1, SecondOp::isReady2, SecondOp::call,
+ // LastOp::isReady1, LastOp::isReady2, LastOp::call
+ // the first isReady of each op will defer the op to be executed later,
allowing for the FATE
+ // thread to interleave and work on another fate id, but may not always
interleave.
+ // It is unlikely that the FATE will not interleave at least once in a
run, so we will check
+ // for at least one occurrence.
+ int interleaves = 0;
+ int i = 0;
+ Map.Entry<FateId,String> prevOp = null;
+ var expRunOrder = List.of("FirstOp::isReady1", "FirstOp::isReady2",
"FirstOp::call",
+ "SecondOp::isReady1", "SecondOp::isReady2", "SecondOp::call",
"LastOp::isReady1",
+ "LastOp::isReady2", "LastOp::call");
+ var fateIdsToExpRunOrder = Map.of(fateIds[0], new
ArrayList<>(expRunOrder), fateIds[1],
+ new ArrayList<>(expRunOrder), fateIds[2], new
ArrayList<>(expRunOrder));
+
+ while (iter.hasNext()) {
+ var currOp = iter.next();
+ FateId fateId = currOp.getKey();
+ String currStep = currOp.getValue();
+ var expRunOrderFateId = fateIdsToExpRunOrder.get(fateId);
+
+ boolean passedFirstStep = !currStep.equals(expRunOrder.get(0));
+ boolean prevFateIdDiffered = prevOp != null &&
!prevOp.getKey().equals(fateId);
+ if (passedFirstStep && prevFateIdDiffered) {
+ interleaves++;
+ }
+ assertEquals(currStep, expRunOrderFateId.remove(0));
+ prevOp = currOp;
+ i++;
+ }
- assertFalse(iter.hasNext());
+ assertTrue(interleaves > 0);
+ assertEquals(i, expRunOrder.size() * numFateIds);
+ assertEquals(numFateIds, fateIdsToExpRunOrder.size());
+ for (var expRunOrderFateId : fateIdsToExpRunOrder.values()) {
+ assertTrue(expRunOrderFateId.isEmpty());
+ }
} finally {
if (fate != null) {
@@ -280,14 +301,14 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
private static final long serialVersionUID = 1L;
@Override
- public long isReady(FateId tid, FilTestEnv env) throws Exception {
+ public long isReady(FateId tid, FeoTestEnv env) throws Exception {
Thread.sleep(50);
insertTrackingData(tid, env, this.getName() + "::isReady");
return 0;
}
@Override
- public Repo<FilTestEnv> call(FateId tid, FilTestEnv manager) throws
Exception {
+ public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv manager) throws
Exception {
Thread.sleep(50);
insertTrackingData(tid, manager, this.getName() + "::call");
return new SecondNonInterleavingOp();
@@ -299,7 +320,7 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
private static final long serialVersionUID = 1L;
@Override
- public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws
Exception {
+ public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws
Exception {
super.call(tid, environment);
return new LastNonInterleavingOp();
}
@@ -311,7 +332,7 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
private static final long serialVersionUID = 1L;
@Override
- public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws
Exception {
+ public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws
Exception {
super.call(tid, environment);
return null;
}
@@ -323,15 +344,16 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
executeTest(this::testNonInterleaving);
}
- protected void testNonInterleaving(FateStore<FilTestEnv> store,
ServerContext sctx)
+ protected void testNonInterleaving(FateStore<FeoTestEnv> store,
ServerContext sctx)
throws Exception {
// This test ensures that when isReady() always returns zero that all the
fate steps will
// execute immediately
- FateId[] fateIds = new FateId[3];
+ final int numFateIds = 3;
+ FateId[] fateIds = new FateId[numFateIds];
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < numFateIds; i++) {
fateIds[i] = store.create();
var txStore = store.reserve(fateIds[i]);
try {
@@ -343,7 +365,7 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
}
}
- Fate<FilTestEnv> fate = null;
+ Fate<FeoTestEnv> fate = null;
// The execution order of the transactions is not according to their
insertion
// order. In this case, without interleaving, a transaction will run start
to finish
@@ -386,10 +408,11 @@ public abstract class FateInterleavingIT extends
SharedMiniClusterBase
Text fateId = subset.keySet().iterator().next().getColumnFamily();
assertTrue(subset.keySet().stream().allMatch(k ->
k.getColumnFamily().equals(fateId)));
- var expectedVals = Set.of("FirstNonInterleavingOp::isReady",
"FirstNonInterleavingOp::call",
+ // list is used to ensure correct operations and correct order of
operations
+ var expectedVals = List.of("FirstNonInterleavingOp::isReady",
"FirstNonInterleavingOp::call",
"SecondNonInterleavingOp::isReady", "SecondNonInterleavingOp::call",
"LastNonInterleavingOp::isReady", "LastNonInterleavingOp::call");
- var actualVals =
subset.values().stream().map(Value::toString).collect(Collectors.toSet());
+ var actualVals =
subset.values().stream().map(Value::toString).collect(Collectors.toList());
assertEquals(expectedVals, actualVals);
return FateId.from(fateId.toString());
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT.java
similarity index 90%
rename from
test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
rename to
test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT.java
index e24fc0af5b..99d14656ec 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT.java
@@ -28,16 +28,16 @@ import org.apache.accumulo.core.fate.AbstractFateStore;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.test.fate.FateInterleavingIT;
+import org.apache.accumulo.test.fate.FateExecutionOrderIT;
-public class MetaFateInterleavingIT extends FateInterleavingIT {
+public class MetaFateExecutionOrderIT extends FateExecutionOrderIT {
// put the fate data for the test in a different location than what accumulo
is using
private static final InstanceId IID = InstanceId.of(UUID.randomUUID());
private static final String ZK_ROOT = ZooUtil.getRoot(IID);
@Override
- public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int
maxDeferred,
+ public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int
maxDeferred,
AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
ServerContext sctx = getCluster().getServerContext();
String path = ZK_ROOT + Constants.ZFATE;
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT.java
similarity index 90%
rename from
test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java
rename to
test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT.java
index 3a0aaeecd5..ec16596df6 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT.java
@@ -25,11 +25,11 @@ import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.fate.AbstractFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
-import org.apache.accumulo.test.fate.FateInterleavingIT;
+import org.apache.accumulo.test.fate.FateExecutionOrderIT;
-public class UserFateInterleavingIT extends FateInterleavingIT {
+public class UserFateExecutionOrderIT extends FateExecutionOrderIT {
@Override
- public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int
maxDeferred,
+ public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int
maxDeferred,
AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
var table = getUniqueNames(1)[0];
try (ClientContext client =